# -*- coding: utf-8 -*-
"""
Created on Fri Nov 29 12:22:12 2013
Collection of criteria for automatic stopping of RichLucy iterations.
The RichLucy.RichLucy function requires to call such a function as
Crit1,Crit2,indopt = calcCrit(SH,M,bdtilde,display,autostop)
with inputs
SH ....... a (maxiter,len(M)) dimensional matrix of RichLucy intermediate results
M ....... the measured spectrum
bdtilde .. the (discrete) line-spread function
display .. boolean, whether to print information to screen
autostop.. boolean, whether to actually calculate the automatic stopping
and outputs
Crit1 .... a len(maxiter) vector of criterion values (e.g. rms of progress)
Crit2 .... a len(maxiter) vector of other criterion values (e.g. curvature)
indopt ... the determined optimal stopping
If anything went wrong, indopt should be equal to -1.
The stopping rules can be tested using the function Test_CalcOptFun in the
module test_modules.py.
implemented stopping rules
* MaxCurv
* EME
* RMSthreshold
@author: Sascha Eichstaedt
"""
import sys
import numpy as np
min_auto_iter = 3 # smallest number of iterations for automatic stopping
min_curv_iter = 3 # smallest number of iterations for using MaxCurv criterion
[docs]def MaxCurv(SH,M,btilde,display=True,autostop=True):
"""
Calculate the root-mean-squared progress in the RichLucy iterations.
Determine the optimal number of iterations from the curvature of the rms values.
:param SH: ndarray of shape (N,K) with N the number of iterations and K the number of wavelengths
:param M: ndarray of shape (K,) containing the measured spectrum values (possibly after interpolation)
:param btilde: ndarray of shape (L,) containg the values of the reversed bandpass function (i.e., the line spread function)
:param autostop: (optional) int, if equal to 1 then the point of maximum curvature is calculated
:param display: (optional) boolean, if True then comments and intermediate results are printed to the command line
:param orig_mode: (optional) boolean, if True then the optimal point is the global maximum curvature, if False then the optimal
point is taken to be that local maximum with smallest error in the estimated measurement
:returns: RMS, ndarray of shape (N,) containing the root-mean-squared progress in the RL estimate per iteration
:returns: curv ndarray of shape (N,) containg the curvature values of the RMS as a function of iteration
:returns: indopt the optimal stopping point as calculated by this criterion
This criterion is published in:
Eichstaedt, Schmaehling, Wuebbeler, Anhalt, Buenger, Krueger & Elster
*Comparison of the Richardson-Lucy method and a classical approach for
spectrometer bandwidth correction*, Metrologia vol. 50, 2013
"""
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.signal import argrelmax
def calcEME(SHcandidates):
m,n = SHcandidates.shape
EstMeasDiff = np.zeros((m,))
for r in range(m):
Mest = np.convolve(SHcandidates[r],btilde,'same')
EstMeasDiff[r] = np.sqrt(np.sum( (Mest - M)**2 ) / len(M))
return EstMeasDiff
lenSH,lM = np.shape(SH)
maxiter = lenSH-1
if display:
sys.stdout.write('Calculating optimal stopping iteration using curvature information\n')
sys.stdout.write('number of estimated spectrum values = ' + repr(lM) + '\n')
sys.stdout.write('maximum number of RichLucy iterations = ' + repr(maxiter) + '\n')
RMS = np.zeros((maxiter))
if display:
sys.stdout.write('Calculating root-mean-squared value of change from iteration to iteration..')
for r in range(1,maxiter+1):
RMS[r-1] = np.sqrt( np.mean( (SH[r,:]-SH[r-1,:])**2 ) )
if display and np.mod(np.round(maxiter/r),10)==0:
sys.stdout.write('.')
if display:
sys.stdout.write('done.\n')
if maxiter>min_curv_iter:
if any(np.isnan(RMS)) or any(np.isinf(RMS)) or np.min(RMS)==np.max(RMS):
if display:
print "Calculation of automatic stopping by max curvature method failed."
return RMS,RMS,-1
if display:
sys.stdout.write('Calculating curvature of rms values.\n')
iters = np.arange(1,maxiter+1)
q = np.linspace(np.log10(iters[0]),np.log10(iters[-1]),len(RMS))
dq = q[1]-q[0]
spline_qrms = InterpolatedUnivariateSpline(np.log10(iters),np.log10(RMS))
qrms = spline_qrms(q)
if np.count_nonzero(np.isnan(qrms))>0 and display:
sys.stdout.write('Warning: In calcOptCurv interpolation of rms values resulted in ' + repr(np.count_nonzero(np.isnan(qrms))) + ' NaN entries!\n')
sys.stdout.write('Results may be inaccurate.\n')
first_derivative = np.gradient(qrms,dq)
snd_derivative = np.gradient(first_derivative,dq)
curv = snd_derivative/(1+first_derivative**2)**1.5
if autostop:
# calculate optimal q-value
qmax = argrelmax(curv)[0]
relevant_qmax = []
for qm in qmax:
if qm >= np.log10(min_auto_iter):
relevant_qmax.append(qm)
if len(relevant_qmax)>0:
indopts = []
for k in range(len(relevant_qmax)):
qopt = q[curv==curv[relevant_qmax[k]]]
qopt = qopt[0]
# transform q-value to iteration number
iopt = np.flatnonzero(np.log10(iters)<=qopt)
if iopt[-1]>min_auto_iter:
indopts.append(iters[iopt[-1]])
if display:
sys.stdout.write("Candidates for optimal iteration are \n" + repr(indopts) + "\n")
# calculate error of estimated measurement for the candidates
EME = calcEME(SH[indopts,:])
if display:
print "Error in estimated measurement for solution candidates are:\n " + repr(EME) + "\n"
optk = np.nonzero(EME == min(EME))[0]
indopt = indopts[optk]
else:
indopt = -1
else:
indopt = maxiter
else:
if display:
sys.stdout.write('Warning: Maximum number of iterations is too for calculation of curvature. Automatic stopping not available.\n')
if autostop:
indopt = -1
else:
indopt = maxiter
curv = np.zeros_like(RMS)
if indopt < 0 and display:
sys.stdout.write('Calculation of optimal stopping using the max curvature method failed.\n')
elif display:
sys.stdout.write('Estimated optimal stopping point is iter:' + repr(indopt) + '\n')
return RMS,curv,indopt
[docs]def EEM(SH,M,bdtilde,display=True,autostop=True):
"""
Determine the optimal stopping by calculating the estimated measurement
for each intermediate estimate and the weighted rms difference from the actual
measurement. The optimal stopping point is that with the first local minimal
weighted rms difference.
:param SH: ndarray of shape (N,K) with N the number of iterations and K the number of wavelengths
:param M: ndarray of shape (K,) containing the measured spectrum values (possibly after interpolation)
:param btilde: ndarray of shape (L,) containg the values of the reversed bandpass function (i.e., the line spread function)
:param autostop: (optional) int, if equal to 1 then the point of maximum curvature is calculated
:param display: (optional) boolean, if True then comments and intermediate results are printed to the command line
:returns: wDiff, ndarray of shape (N,) containing the weighted rms errors of the estimated measurement for each iteration
:returns: wDiff (twice, to have the same input-output structure as `calcOptCurv`)
:returns: indopt, int -- the index of the optimal stopping iteration
This criterion is published in:
Jan Audenaert et al. *Bayesian deconvolution method applied to experimental bidirectional
transmittance distribution functions*, Meas. Sci. Techol. 24, 2013
"""
from scipy.signal import argrelmin
lenSH,lM = np.shape(SH)
maxiter = lenSH-1
if lM!=len(M):
raise ValueError('Dimension of estimated spectra and measured spectrum do not fit together.\n')
if display:
sys.stdout.write('Calculating optimal stopping iteration by calculating the estimated measurement.\n')
sys.stdout.write('number of estimated spectrum values = ' + repr(lenSH) + '\n')
sys.stdout.write('maximum number of RichLucy iterations = ' + repr(maxiter) + '\n')
wDiff = np.zeros((maxiter))
if display:
sys.stdout.write('Convolving the estimated spectra with the bandpass to calculate estimated measurement')
for r in range(maxiter):
Mest = np.convolve(SH[r+1],bdtilde,'same')
inds = np.nonzero(Mest>1e-12)
if np.count_nonzero(Mest<0)>0:
raise ValueError('Estimated measurement for estimated spectrum results in negative values (Nr. '+repr(r)+')')
wDiff[r] = np.sqrt( np.sum( ( ((Mest[inds]-M[inds])/Mest[inds])**2 ) )/len(inds))
if display and np.mod(r,maxiter*0.1)==0:
sys.stdout.write('.')
if display:
sys.stdout.write('done\n')
if autostop and maxiter>min_auto_iter:
if any(np.isnan(wDiff)) or any(np.isinf(wDiff)) or np.min(wDiff)==np.max(wDiff):
if display:
print "Calculation of automatic stopping by EEM method failed."
return wDiff,wDiff,-1
indopts= argrelmin(wDiff)[0]
if display:
sys.stdout.write('Found ' + repr(len(indopts)) + ' local minima\n')
if len(indopts)>0:
if indopts[0]<min_auto_iter and len(indopts)>1:
indopts = indopts[1:]
indopt = np.nonzero(wDiff==np.min(wDiff[indopts]))[0]
else:
if display:
sys.stdout.write('Taking absolute minimum instead.\n')
indopt = np.argmin(wDiff)
else:
if autostop and display:
sys.stdout.write('Warning: Maximum number of iterations is too small. Automatic stopping not available.\n')
indopt = maxiter
if isinstance(indopt,np.ndarray):
indopt = indopt[0]
if indopt>0 and display:
sys.stdout.write('Estimated optimal stopping point is iter:' + repr(indopt) + '\n')
return wDiff,wDiff,indopt
def RMSthreshold(SH,M,bdtilde,display=True,autostop=True,tau=1e-4):
"""Stop the iterations once the rms difference between subsequent iterations is relatively below tau
"""
lenSH, lM = np.shape(SH)
maxiter = lenSH-1
if lM!=len(M):
raise ValueError('Dimension of estimated spectra and measured spectrum do not fit together.\n')
if display:
sys.stdout.write('Calculating optimal stopping iteration by using a threshold value on the RMS of change.\n')
sys.stdout.write('number of estimated spectrum values = ' + repr(lenSH) + '\n')
sys.stdout.write('maximum number of RichLucy iterations = ' + repr(maxiter) + '\n')
sys.stdout.write('tolerance value = ' + repr(tau) + '\n')
RMSDiff = np.zeros((maxiter))
relRMSDiff = np.zeros((maxiter))
if display:
sys.stdout.write('Calculating the rms difference between subsequent iteration results...')
for r in range(1,maxiter+1):
RMSDiff[r-1] = np.sqrt( np.mean( (SH[r,:]-SH[r-1,:])**2 ) )
relRMSDiff[r-1] = RMSDiff[r-1]/np.linalg.norm(SH[r-1,:])
if display:
sys.stdout.write('done\n')
if autostop and maxiter>min_auto_iter:
if any(np.isnan(RMSDiff)) or any(np.isinf(RMSDiff)) or np.min(RMSDiff)==np.max(RMSDiff):
if display: print "Calculation of optimal stopping using the RMS threshold method failed."
return RMSDiff,RMSDiff,-1
iters = np.arange(maxiter)
indopts= iters[relRMSDiff<=tau]
if len(indopts)>0:
if indopts[0]<min_auto_iter and len(indopts)>1:
indopts = indopts[1:]
indopt = indopts[0]
else:
indopt = maxiter
if display:
sys.stdout.write('Relative rms between subsequent iterations was never below chosen threshold - taking maxiter as optimal iteration.\n')
else:
if autostop and display:
sys.stdout.write('Warning: Maximum number of iterations is too small. Automatic stopping not available.\n')
indopt = maxiter
if indopt>0 and display:
sys.stdout.write('Estimated optimal stopping point is iter:' + repr(indopt) + '\n')
return RMSDiff,relRMSDiff,indopt