# -*- coding: utf-8 -*-
"""
Copyright S. Eichstaedt, F. Schmaehling (PTB) 2013
sascha.eichstaedt@ptb.de
This software is licensed under the BSD-like license:
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
This software was developed at Physikalisch-Technische Bundesanstalt.
The software is made available 'as is' free of cost. PTB assumes
no responsibility whatsoever for its use by other parties, and makes no
guarantees, expressed or implied, about its quality, reliability, safety,
suitability or any other characteristic. In no event will PTB be liable
for any direct, indirect or consequential damage arising in connection
with the use of this software.
Citation when using this software is:
Eichstaedt, Schmaehling, Wuebbeler, Anhalt, Buenger, Krueger & Elster
'Comparison of the Richardson-Lucy method and a classical approach for
spectrometer bandwidth correction', Metrologia vol. 50, 2013
DOI: 10.1088/0026-1394/50/2/107
In case of questions or suggestions contact Sascha.Eichstaedt@ptb.de
This module contains the following functions:
* calcOptCurv
* caclOptDiff
* RichLucy
* EstMeas
* Interpolate2Equidist
* Interpolate2SameStep
* InterpolateBandpass
* LoadFromFile
* RichLucyUncertainty
* RichLucyDeconv
"""
version = 1.95
min_auto_iter = 4 # smallest number of iterations for automatic stopping
rel_tol_zero = 1e-6 # relative tolerance for measured spectrum values for judgement of automatic stopping
import numpy as np
import sys
from scipy.interpolate import interp1d
try:
from pyxll import xl_func,xl_macro
except ImportError:
# pyXLL is not available - use dummy decorator functions instead
def xl_func(signature):
def dummy(func):
return func
return dummy
def xl_macro(signature):
def dummy(func):
return func
return dummy
# function to calculate optimal iteration from curvature
[docs]def calcOptCurv(SH,M,btilde,autostop=1,display=False,orig_mode=False):
"""
Calculate the root-mean-squared progress in the RichLucy iterations.
Determine the optimal number of iterations from the curvature of the rms values.
:param SH: ndarray of shape (N,K) with N the number of iterations and K the number of wavelengths
:param M: ndarray of shape (K,) containing the measured spectrum values (possibly after interpolation)
:param btilde: ndarray of shape (L,) containg the values of the reversed bandpass function (i.e., the line spread function)
:param autostop: (optional) int, if equal to 1 then the point of maximum curvature is calculated
:param display: (optional) boolean, if True then comments and intermediate results are printed to the command line
:param orig_mode: (optional) boolean, if True then the optimal point is the global maximum curvature, if False then the optimal
point is taken to be that local maximum with smallest error in the estimated measurement
:returns: RMS, ndarray of shape (N,) containing the root-mean-squared progress in the RL estimate per iteration
:returns: curv ndarray of shape (N,) containg the curvature values of the RMS as a function of iteration
:returns: indopt the optimal stopping point as calculated by this criterion
This criterion is published in:
Eichstaedt, Schmaehling, Wuebbeler, Anhalt, Buenger, Krueger & Elster
*Comparison of the Richardson-Lucy method and a classical approach for
spectrometer bandwidth correction*, Metrologia vol. 50, 2013
"""
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.signal import argrelmax
def calcEME(SHcandidates):
m,n = SHcandidates.shape
EstMeasDiff = np.zeros((m,))
for r in range(m):
Mest = np.convolve(SHcandidates[r],btilde,'same')
EstMeasDiff[r] = np.sqrt(np.sum( (Mest - M)**2 ) / len(M))
return EstMeasDiff
min_curv_iter = 3
lenSH,lM = np.shape(SH)
maxiter = lenSH-1
if display:
sys.stdout.write('Calculating optimal stopping iteration using curvature information\n')
RMS = np.zeros((maxiter))
if display:
sys.stdout.write('Calculating root-mean-squared value of estimation progress..')
for r in range(1,maxiter+1):
RMS[r-1] = np.sqrt( np.mean( (SH[r,:]-SH[r-1,:])**2 ) )
if display and np.mod(np.round(maxiter/r),10)==0:
sys.stdout.write('.')
if display:
sys.stdout.write('done.\n')
if any(np.isnan(RMS)) or any(np.isinf(RMS)) or np.min(RMS)==np.max(RMS):
return RMS,RMS,-1
if maxiter>min_curv_iter:
if display:
sys.stdout.write('Calculating curvature of rms progress.\n')
iters = np.arange(1,maxiter+1)
q = np.linspace(np.log10(iters[0]),np.log10(iters[-1]),len(RMS))
dq = q[1]-q[0]
spline_qrms = InterpolatedUnivariateSpline(np.log10(iters),np.log10(RMS))
qrms = spline_qrms(q)
if np.count_nonzero(np.isnan(qrms))>0 and display:
sys.stdout.write('Warning: In calcOptCurv interpolation of rms values resulted in ' + repr(np.count_nonzero(np.isnan(qrms))) + ' NaN entries!\n')
sys.stdout.write('Results may be inaccurate.\n')
first_derivative = np.gradient(qrms,dq)
snd_derivative = np.gradient(first_derivative,dq)
curv = snd_derivative/(1+first_derivative**2)**1.5
if autostop:
# calculate optimal q-value
qmax = argrelmax(curv)[0]
relevant_qmax = []
for qm in qmax:
if qm >= np.log10(min_auto_iter):
relevant_qmax.append(qm)
if len(relevant_qmax)>0:
if orig_mode: # global maximum
qopt = q[curv==np.max(curv[qmax])]
qopt = qopt[0]
indopt = np.flatnonzero(np.log10(iters)<=qopt)
indopt = iters[indopt[-1]]
else: # maximum with smallest error in estimated measurement
indopts = []
for k in range(len(relevant_qmax)):
qopt = q[curv==curv[relevant_qmax[k]]]
qopt = qopt[0]
# transform q-value to iteration number
iopt = np.flatnonzero(np.log10(iters)<=qopt)
indopts.append(iters[iopt[-1]])
# calculate error of estimated measurement for the candidates
EME = calcEME(SH[indopts,:])
if display:
print "potential stopping points are:\n " + repr(indopts) + "\n"
optk = np.nonzero(EME == min(EME))[0]
indopt = indopts[optk]
else:
indopt = -1
else:
indopt = maxiter
else:
if display:
sys.stdout.write('Warning: Maximum number of iterations is too small for calculation of curvature. Automatic stopping not available.\n')
if autostop:
indopt = -1
else:
indopt = maxiter
curv = np.zeros_like(RMS)
if indopt < 0 and display:
sys.stdout.write('Calculation of optimal stopping using the max curvature method failed.\n')
return RMS,curv,indopt
def calcEEM(SH,M,bdtilde,autostop=1,display=False):
"""
Determine the optimal stopping by calculating the estimated measurement
for each intermediate estimate and the weighted rms difference from the actual
measurement. The optimal stopping point is that with the first local minimal
weighted rms difference.
:param SH: ndarray of shape (N,K) with N the number of iterations and K the number of wavelengths
:param M: ndarray of shape (K,) containing the measured spectrum values (possibly after interpolation)
:param btilde: ndarray of shape (L,) containg the values of the reversed bandpass function (i.e., the line spread function)
:param autostop: (optional) int, if equal to 1 then the point of maximum curvature is calculated
:param display: (optional) boolean, if True then comments and intermediate results are printed to the command line
:returns: wDiff, ndarray of shape (N,) containing the weighted rms errors of the estimated measurement for each iteration
:returns: wDiff (twice, to have the same input-output structure as `calcOptCurv`)
:returns: indopt, int -- the index of the optimal stopping iteration
This criterion is published in:
Jan Audenaert et al. *Bayesian deconvolution method applied to experimental bidirectional
transmittance distribution functions*, Meas. Sci. Techol. 24, 2013
"""
from scipy.signal import argrelmin
lenSH,lM = np.shape(SH)
maxiter = lenSH-1
if lM!=len(M):
raise ValueError('Dimension of estimated spectra and measured spectrum are inconsistent.\n')
if display:
sys.stdout.write('Calculating optimal stopping iteration by calculating the estimated measurement.\n')
wDiff = np.zeros((maxiter))
if display:
sys.stdout.write('Convolving the estimated spectra with the bandpass to calculate estimated measurement')
for r in range(maxiter):
Mest = np.convolve(SH[r+1],bdtilde,'same')
inds = np.nonzero(Mest>1e-12)
if np.count_nonzero(Mest<0)>0:
raise ValueError('Estimated measurement for estimated spectrum results in negative values (Nr. '+repr(r)+')')
wDiff[r] = np.sqrt( np.sum( ( ((Mest[inds]-M[inds])/Mest[inds])**2 ) )/len(inds))
if display and np.mod(r,maxiter*0.1)==0:
sys.stdout.write('.')
if display:
sys.stdout.write('done\n')
if autostop and maxiter>min_auto_iter:
indopts= argrelmin(wDiff)[0]
if len(indopts)>0:
if indopts[0]<min_auto_iter and len(indopts)>1:
indopts = indopts[1:]
indopt = np.nonzero(wDiff==np.min(wDiff[indopts]))[0]
else:
indopt = -1
else:
if autostop and display:
sys.stdout.write('Warning: Maximum number of iterations is too small. Automatic stopping not available.\n')
indopt = maxiter
return wDiff,wDiff,indopt
###################################### The actual Richardson-Lucy method #################################
[docs]def RichLucy(b,delta,M,maxiter=500,autostop=True,display=True,optFun=calcOptCurv,regFun=None,calcCrit=True,returnAll=False,initialShat=None):
"""
Richardson Lucy iteration under the assumption that the sampling of b and M is equidistant and
that the wavelength step size of b and M is equal.
:param b: ndarray of shape (N,) with the bandpass function values
:param delta: float, wavelength step size for b and M
:param M: ndarray of shape (L,) with the measured spectrum values
optional parameters
:param maxiter: (optional) maximum number of iterations, default is 500
:param autostop: (optional) boolean whether to automatically find optimal iteration number (default: True)
:param display: (optional) boolean whether to print information to the command line (default: True)
:param optFun: (optional) function handle to a function that calculates the optimal iteration number (default: calcOptCurv)
:param regFun: (optional) function handle to a function that regularizes each RichLucy iteration (e.g., Tikhonov)
:param calcCrit: (optional) boolean whether to calculate the stopping criterion irrespective of autostop (default:True)
:param returnAll: (optional) boolean whether to return all intermediate estimates (default: False)
:param initialShat: (optional) ndarray of shape (L,) with an initial estimate of the input spectrum (default: None)
Output:
:returns: Shat, ndarray of shape (L,) - the estimated spectrum
:returns: critVals1, ndarray of shape (maxiter,) containing values calculated from optFun (rms of progress for calcOptCurv)
:returns: critVals2, ndarray of shape (maxiter,) containing other values calculated from optFun (curvature of rms of progress for calcOptCurv)
"""
if display:
sys.stdout.write('\n -------------- Richardson-Lucy algorithm version ' + repr(version) + ' --------------\n')
if np.count_nonzero(M<0)>0 or np.count_nonzero(b<0)>0:
raise ValueError("Measured spectrum and bandpass function must not have negative values.")
if np.abs(np.trapz(b,dx=delta)-1)>1e-4:
raise ValueError("Line-spread function must be normalized.")
if maxiter<min_auto_iter:
if autostop and display:
sys.stdout.write('Warning: Maximum number of iterations is too small. Automatic stopping not available.\n')
autostop = False
if autostop:
calcCrit = True
# transform continuous b to discrete (impulse invariance technique)
bd = b*delta
bdtilde=bd[::-1]
# detect relevant region of meas. spectrum (judge estimate only there)
indM = np.nonzero(M>=np.max(M)*rel_tol_zero)[0]
if len(indM)<len(M) and autostop and display:
print "Data contains " + repr(len(M)-len(indM)) + " values which are relatively close to zero. "
print "These will not be considered for the determination of the stopping point."
# adjust length of M and b for proper convolution
if M.size < b.size:
sys.stdout.write("The bandpass function seems to be defined over a larger wavelength region than the actual measurement.\n")
sys.stdout.write("Padding the missing spectrum values with zeros, but results may be inaccurate\n")
leng_diff = b.size - M.size
M = np.concatenate((np.zeros(leng_diff),M,np.zeros(leng_diff)))
# allocate computer memory
SH = np.zeros((maxiter+1,M.size))
# initial 'iteration'
r = 0.0
if isinstance(initialShat,np.ndarray):
if len(initialShat) == M.size:
SH[r,:] = initialShat
else:
if display:
print "User-supplied initial estimate does not match measured spectrum and will be ignored."
SH[r,:] = M
else:
SH[r,:] = M;
# additional iterations
if display:
if autostop:
print('Richardson-Lucy method calculating optimal stopping criterion using ' + optFun.__name__)
print('Step 1: Carry out ' + repr(int(maxiter)) + ' iterations.')
else:
print('Richardson-Lucy method with ' + repr(int(maxiter)) + ' iterations')
####################### original RL iterations
while r < maxiter:
r = r+1
# actual RichLucy step
tmp1 = np.convolve(SH[r-1,:],bdtilde,'same')
tmp1[tmp1!=0] = M[tmp1!=0]/tmp1[tmp1!=0]
# if anything went wrong during convolution - set to zero
count_all = np.count_nonzero(tmp1==0)
tmp1[np.isnan(tmp1)] = 0; tmp1[np.isinf(tmp1)] = 0
count_bad = np.count_nonzero(tmp1==0)-count_all
if display and count_bad>0:
sys.stdout.write('After convolution in RL iteration %d, %d estimated values were set to zero. \n' % (r,count_bad))
tmp2 = np.convolve(tmp1,bdtilde[::-1],'same')
if not regFun is None:
Shat_new = regFun(SH[r-1,:],delta)*tmp2
else:
Shat_new = SH[r-1,:]*tmp2
Shat_new[np.isnan(Shat_new)]=0; Shat_new[np.isinf(Shat_new)]=0
SH[r,:]=Shat_new
if np.mod(r,maxiter*0.05)==0 and display:
sys.stdout.write(".")
######################
if display:
print(' Done.')
if autostop:
print('Step 2: Calculating optimal stopping using '+optFun.__name__)
if calcCrit:
critVals1,critVals2,indopt = optFun(SH[:,indM],M[indM],bdtilde,autostop,display=display)
else:
critVals1 = np.zeros((maxiter))
critVals2 = np.zeros((maxiter))
indopt = maxiter
if indopt<0:
print('Warning: Calculation of optimal stopping failed. Using measured spectrum as best estimate.')
Shat = M
else:
if autostop:
if display:
print('Optimal number of iterations = ' + repr(int(indopt)))
Shat = np.squeeze(SH[indopt-1,:])
else:
Shat = SH[-1,:]
if returnAll:
if calcCrit:
return Shat,critVals1,critVals2,SH
else:
return Shat, SH
else:
if calcCrit:
return Shat,critVals1,critVals2
else:
return Shat
######################### some helper functions for several tasks
[docs]def LoadFromFile(filename):
"""
Load wavelength scale axis, measured values and uncertainties from file.
Output of this method is the matrix "data".
:param filename: string, name of the file to load (including full path)
:returns: data matrix
:raises: `ValueError` if data file could not be loaded.
"""
try:
data = np.loadtxt(filename,comments='#')
except ValueError:
try:
data = np.loadtxt(filename,comments='#',delimiter=',')
except ValueError:
try:
data = np.loadtxt(filename,comments='#',delimiter=';')
except ValueError:
raise ValueError("Data file could not be loaded." \
"Please provide data column-wise and separated by whitespaces. " \
"For further details see the README file.\n")
return data
@xl_func("numpy_column lambda, numpy_column values: numpy_array")
[docs]def InterpolateBandpass(lamb_axis,values):
"""
Interpolation of bandpass values (line spread function). This is just
a wrapper to allow for a convenient function call from EXCEL.
"""
newAx, newVals = Interpolate2Equidist(lamb_axis,values)[:2]
result = np.vstack((newAx,newVals))
result = result.transpose()
return result
@xl_func("numpy_column lambda_b, numpy_column b: var")
[docs]def checkBandpass(lambda_b,b):
"""
For given bandpass values b and corresponding wavelengths lambda_b, this
function determines whether::
* the lambda axis is strictly monotone increasing
* the lambda axis is equidistant
* the bandpass is normalized to integrate to 1
:param lambda_b: ndarray of shape (N,) with wavelengths for the bandpass function
:param b: ndarray of shape (N,) with corresponding bandpass values
:returns: result - a list: [lambda monotonically increasing?, is equidistant?, is normalized?, delta_b]
"""
#adjust range of values
inds = np.nonzero(lambda_b>0.0)
lambda_b = lambda_b[inds]
b = b[inds]
#check monotonicity
if not np.all(np.sort(lambda_b) == lambda_b) or np.count_nonzero((np.diff(lambda_b)<1e-14)) > 0:
check_mono = "False"
else:
check_mono = "True"
#check equidistance
if np.max(np.diff(lambda_b)) - np.min(np.diff(lambda_b)) > 1e-5:
check_equi = "False"
else:
check_equi = "True"
#check normalization
bn = np.trapz(b,lambda_b)
if np.abs(1-bn) > 1e-5:
check_norm = "False (integral = %1.4e)" % bn
else:
check_norm = "True"
delta = np.min(np.abs(np.diff(lambda_b)))
result = [[check_mono],[check_equi],[check_norm],[delta]]
return result
@xl_func("numpy_column lambda_M, numpy_column M: var")
[docs]def checkSpectrum(lambda_M,M):
"""
For given vector of measured spectrum values and corresponding wavelengths
this function determines whether::
* the lambda axis is strictly monotonically increasing
* the lambda axis is equidistant
:param lambda_M: ndarray of shape (N,) with wavelengths for the measured spectrum
:param M: ndarray of shape (N,) with corresponding spectrum values
:returns: result - the list: [lambda_M monotonically increasing?, is equidistant?, delta_M]
"""
#adjust range of values
inds = np.nonzero(lambda_M>0.0)
lambda_M = lambda_M[inds]
M = M[inds]
#check monotonicity
if not np.all(np.sort(lambda_M) == lambda_M) or np.count_nonzero((np.diff(lambda_M)<1e-14)) > 0:
check_mono = "False"
else:
check_mono = "True"
#check equidistance
if np.max(np.diff(lambda_M)) - np.min(np.diff(lambda_M)) > 1e-5:
check_equi = "False"
else:
check_equi = "True"
delta = np.min(np.diff(lambda_M))
result = [[check_mono],[check_equi],[delta]]
return result
[docs]def Interpolate2Equidist(lamb_axis,values,display=True,interp_kind='cubic',tol_equi=1e-5):
"""
Interpolation of measured values if wavelength scale axis is not equidistant.
Input of this function are the original measured values and the corresponding
wavelength scale. The output is the interpolated version with the new
scale, a flag whether interpolated or not and a tuple (numNaN,numNeg)
with the number of NaN and negative values after interpolation.
:param lamb_axis: ndarray of shape (N,) with wavelength values
:param values: ndarray of shape (N,) with measured values
optional parameters
:param display: boolean whether to print information to the command line (default:True)
:param interp_kind: string which is handed over to the scipy interp1d function (default:'cubic')
:param tol_equi: float, tolerance value to judge whether wavelength axis is equidistant (default:1e-5)
Outputs
:returns: newAx, ndarray of shape (L,) with equidistant wavelength scale
:returns: newVals, ndarray of shape (L,) with corresponding interpolated spectrum values
:returns: did_interp, boolean whether interpolation was actually necessary
:returns: numNaN, numNeg, ints - the number of NaN and negative values after interpolation
"""
if display:
print "\n Check for equidistant wavelength scale axis "
if not np.all(np.sort(lamb_axis) == lamb_axis) or np.count_nonzero(lamb_axis)<1e-14>0:
print "Error in Interpolate2Equidist: Wavelength scale is not strictly monotonically increasing!"
return
if np.max(np.diff(lamb_axis)) - np.min(np.diff(lamb_axis)) > tol_equi:
delta = min(np.diff(lamb_axis))
if display:
print "Wavelength scale is not equidistant."\
"New wavelength scale step size is %1.4f. Interpolating..." % delta
newAx = np.arange(lamb_axis[0],lamb_axis[-1],delta)
Intfunc = interp1d(lamb_axis,values,kind=interp_kind,bounds_error=False)
newVals = Intfunc(newAx)
numNaN = np.count_nonzero(np.isnan(newVals))
numNeg = np.count_nonzero(newVals<0)
newVals[np.nonzero(np.isnan(newVals))]=0.0
newVals[newVals<0] = 0.0
if display:
print "Interpolation resulted in %d NaN(s) and %d negative value(s)." % (numNaN,numNeg)
if numNaN+numNeg>0:
print "All set to zero."
did_interp = True
else:
if display:
print "Wavelength scale axis is equidistant. No interpolation necessary."
newAx = lamb_axis
newVals = values
numNaN = 0
numNeg = 0
did_interp = False
if display:
print ""
return newAx, newVals, did_interp, numNaN, numNeg
[docs]def Interpolate2SameStep(M,lambda_M,delta_b,display=True,interp_kind='cubic',tol_equi=1e-5):
"""
Interpolation of measured spectrum if its wavelength step size is not equal to that
of the bandpass function or if it is not equidistant. The output is the interpolated version with the new scale,
a flag whether interpolated or not and a tuple (numNaN,numNeg) with the number of NaN and negative values after interpolation.
:param M: ndarray of shape (N,) with the measured spectrum values
:param lambda_M: ndarray of shape (N,) with the corresponding wavelengths
:param delta_b: float, the wavelength step size for the bandpass function
optional parameters
:param display: boolean, whether to print information to the command line
:param interp_kind: string, which is passed to the scipy interp1d function
:param tol_equi: float, tolerance for judgin whether the wavelength step sizes are equal
Outputs
:returns: lm_interp - ndarray of shape (K,) of new wavelength values for measured spectrum
:returns: M_interp - ndarray of shape (K,) of corresponding interpolated spectrum values
:returns: did_interp - boolean whether interpolation was actually necessary
:returns: numNaN,numNeg - ints, the number of NaN and negative values after interpolation
"""
if display:
print "\n Check for equidistant wavelength scale axis"
if not np.all(np.sort(lambda_M) == lambda_M) or np.count_nonzero((np.diff(lambda_M)<1e-14)) >0:
raise ValueError("Error in Interpolate2SameStep: Wavelength scale is not strictly monotonically increasing!")
deltaM = min(np.diff(lambda_M))
if np.abs(deltaM-delta_b) > tol_equi or np.max(np.diff(lambda_M)) - np.min(np.diff(lambda_M)) > tol_equi:
if display:
print "Wavelength step size in measured spectrum requires interpolation."\
"New step size is %1.4f. Interpolating..." % delta_b
lm_interp = np.arange(lambda_M[0],lambda_M[-1],delta_b)
spline_M = interp1d(lambda_M,M,bounds_error=False,kind=interp_kind)
M_interp = spline_M(lm_interp)
numNaN = np.count_nonzero(np.isnan(M_interp))
numNeg = np.count_nonzero(M_interp<0)
M_interp[np.nonzero(np.isnan(M_interp))] = 0.0
M_interp[M_interp<0] = 0.0
if display:
print "Interpolation resulted in %d NaN(s) and %d negative value(s)." % (numNaN,numNeg)
if numNaN+numNeg>0:
print "All set to zero."
did_interp = True
else:
if display:
print "Wavelength scale step size equal and equidistant. No interpolation necessary."
lm_interp = lambda_M
M_interp = M
numNaN = 0
numNeg = 0
did_interp = False
if display:
print ""
return lm_interp,M_interp,did_interp,numNaN,numNeg
def EstMeas(Shat,b,lambda_S,lambda_b,display=False):
"""
Calculate the measurement result estimated from an estimated input spectrum
and the bandpass function.
If necessary, the bandpass is interpolated to obtain an equidistantly sampled sequence.
If necessary, the estimated input spectrum Shat is interpolated to obtain
the same step size as for the bandpass
"""
if display:
print "---- Calculating estimated measurement from bandpass and RichLucy estimate ----- "
lambda_b,b = Interpolate2Equidist(lambda_b,b,display=display)[:2]
deltab = lambda_b[1]-lambda_b[0]
b = b/np.trapz(b,lambda_b)
lS_interp,Shat,interpolated = Interpolate2SameStep(Shat,lambda_S,deltab,display=display)[:3]
Mhat = np.convolve(Shat,b[::-1]*deltab,'same')
if interpolated:
Mhat_interp = interp1d(lS_interp,Mhat,kind='linear',bounds_error=False,fill_value=0.0)
Mhat = Mhat_interp(lambda_S)
if display:
print "-----------------------------------------------------------"
Mhat[np.nonzero(Mhat<0)] = 0.0
return Mhat, lambda_S
##############################################################
[docs]def RichLucyUncertainty(bhat,b_unc,lambda_b,Mhat,M_unc,lambda_M,\
runs=100,maxiter=500,autostop=1,display=1,maxncpu=10,\
optFun=calcOptCurv,calcCrit=True,returnAll=False,plotRes=False):
"""
This function carries out uncertainty evaluation for the Richardson-Lucy
deconvolution by using the GUM Supplement 2 Monte Carlo method. It assumes
that knowledge about the input quantities (i.e., bandpass and measured spectrum)
is available in terms of an estimate and a vector of associated standard uncertainties.
TODO: Extend the uncertainty evaluation to arbitrary type of input knowledge.
:param bhat: ndarray of shape (N,) with estimate of bandpass values
:param b_unc: ndarray of shape (N,) with associated standard uncertainties
:param lambda_b: ndarray of shape (N,) with corresponding wavelengths
:param Mhat: ndarray of shape (L,) with measured spectrum values
:param M_unc: ndarray of shape (L,) with associated standard uncertainties
:param lambda_M: ndarray of shape (L,) with corresponding wavelengths
optional arguments
:param runs: int - the number of Monte Carlo runs for uncertainty propagation
:param maxiter: int - the maximum number of RichLucy iterations per run
:param autostop: boolean - whether to determine optimal stopping point
:param display: boolean - whether to print information to the command line
:param maxncpu: int - the maximum number of parallel workers for the Monte Carlo
:param optFun: function for the determination of the optimal stopping point
:param calcCrit: boolean - whether to calculate the stopping criterion using optFun irrespective of autostop
:param returnAll: boolean - whether to return all intermediate results
:param plotRes: boolean - whether to plot result
:returns: Shat, UShat - ndarrays of shape (L,) with the mean estimated spectrum and its associated uncertainty, respectively
"""
from time import time
from multiprocessing import Queue,Process,cpu_count
if plotRes:
from matplotlib.pyplot import figure,plot,clf,legend,imshow,show,colorbar,title
all_args = (bhat,b_unc,lambda_b,Mhat,M_unc,lambda_M,autostop,display,maxiter)
# call routine once to check for errors before calling it with several processes
try:
test_args = (bhat,b_unc,lambda_b,Mhat,M_unc,lambda_M,autostop,False,maxiter)
__callRLparallel(test_args)
finally:
print ""
SH = np.zeros((len(Mhat),runs))
"""
The actual call of the RichLucy method has to be in seperate methods
in order to work with the Queue concept of multiprocessing (functions can't be pickled)
"""
t0 = time()
task_Queue = Queue()
result_Queue = Queue()
ncpu = min( max(cpu_count()-2,1), maxncpu)
sys.stdout.write('\nStarting ' + repr(runs) + ' Monte Carlo trials with ' + str(ncpu) + ' parallel workers\n')
tasks = [(__callRLparallel, (all_args)) for k in range(runs)]
for task in tasks:
task_Queue.put(task)
for i in range(ncpu):
Process(target=__QueueWorker, args=(task_Queue,result_Queue)).start()
results = [result_Queue.get() for task in tasks]
for i in range(ncpu):
task_Queue.put('STOP')
dauer = time()-t0
SH = np.array(results)
Shat = np.mean(SH,axis=0)
uShat = np.std(SH,axis=0)
UShat = np.cov(SH.T)
if len(Shat) != len(Mhat):
raise ValueError("Something went wrong in the Monte Carlo method. Nothing will be returned.")
return
if plotRes:
figure(1);clf()
plot(lambda_M,Mhat,lambda_M,Shat,lambda_M,uShat)
legend(('measured','estimated','uncertainties'))
figure(2);clf()
imshow(UShat)
title("covariance matrix")
colorbar()
show()
print "-----------------------------------------------------------"
print "All done. Monte Carlo with %d runs took about %1.2f seconds" % (runs,np.round(dauer))
print "-----------------------------------------------------------"
return Shat, UShat
################ helper functions for the Monte Carlo ##################
def __QueueWorker(input,outQueue):
for func,fun_args in iter(input.get,'STOP'):
result = func(fun_args)
outQueue.put(result)
def __callRLparallel(fun_args):
# Carries out the individual Monte Carlo trials
bhat,b_unc,lambda_b,Mhat,M_unc,lambda_M,autostop,display,maxiter = fun_args
# draw from the original distributions (withput interpolation)
M = Mhat + np.random.randn(Mhat.size)*M_unc
b = bhat + np.random.randn(bhat.size)*b_unc
lb,b = Interpolate2Equidist(lambda_b,b,display=display)[:2]
deltab = lb[1]-lb[0]
lM,M,interpolated = Interpolate2SameStep(M,lambda_M,deltab,display=display)[:3]
M[M<0] = 0.0
b[b<0] = 0.0
# normalization of bandpass
b = b/np.trapz(b,dx=deltab)
if interpolated:
if display:
print "Running RichLucy with interpolated data..."
Shat_interp = RichLucy(b,deltab,M,\
autostop=autostop,maxiter=maxiter,display=display)[0]
spline_Shat = interp1d(lM,Shat_interp,bounds_error=False,kind='linear')
Shat = spline_Shat(lambda_M)
Shat[np.nonzero(np.isnan(Shat))] = 0.0
Shat[Shat<0.0] = 0.0
else:
if display:
print "Running RichLucy..."
Shat = RichLucy(b,deltab,M,autostop=autostop,\
maxiter=maxiter,\
display=display)[0]
return Shat
##########################################
def __Excel2NDArray(tupleofTuples):
# flatten the tuple of tuples to a 1D list
all_vals = [element for tup in tupleofTuples for element in tup]
# throw away the None values (corresp. to empty cells in Excel)
vals = filter(None,all_vals)
# convert to ndarray
return np.asarray(vals)
@xl_macro("float dummy")
def __RLD_macro(dummy=None):
import pyxll
import logging
_log = logging.getLogger(__name__)
try:
import win32com.client
except ImportError:
_log.warning("*** win32com.client could not be imported ***")
_log.warning("*** the RichLucy Macro is working only under Microsoft Windows ***")
xl_window = pyxll.get_active_object()
xl_app = win32com.client.Dispatch(xl_window).Application
# it's helpful to make sure the gen_py wrapper has been created
# as otherwise things like constants and event handlers won't work.
win32com.client.gencache.EnsureDispatch(xl_app)
lambda_bandpass = __Excel2NDArray(xl_app.Range("A2:A1000").Value)
bandpass = __Excel2NDArray(xl_app.Range("B2:B1000").Value)
lambda_spectrum = __Excel2NDArray(xl_app.Range("C2:C1000").Value)
spectrum = __Excel2NDArray(xl_app.Range("D2:D1000").Value)
val_range = "E2:E%d" % (len(spectrum)+1)
max_it = xl_app.Range("maxiter").Value
if max_it==None:
maxiter = 500
else:
maxiter = int(xl_app.Range("maxiter").Value)
curv_range = "F2:F%d" % (maxiter+1)
xl_app.Range(val_range).Value = np.zeros((len(spectrum),1))
xl_app.Range(curv_range).Value = np.zeros((maxiter,1))
Shat,RMS,curv = RichLucyDeconv(lambda_bandpass,bandpass,lambda_spectrum,spectrum,maxiter,returnAll=True)
xl_app.Range(val_range).Value = np.reshape(Shat,(len(Shat),-1))
xl_app.Range(curv_range).Value = np.reshape(curv,(len(curv),-1))
@xl_func("numpy_column lambda_bandpass, numpy_column bandpass, numpy_column lambda_spectrum, numpy_column spectrum, int maxiter: numpy_column")
[docs]def RichLucyDeconv(lambda_bandpass,bandpass,lambda_spectrum,spectrum,display=True,returnAll=False,**kwargs):
"""
Application of Richardson-Lucy deconvolution for provided bandpass (line spread function)
and measured spectrum. This method carries out interpolation if the sampling of bandpass
and spectrum does not meet the RichLucy assumptions. If necessary, the estimation result
is returned interpolated to the original wavelength values.
:param lambda_bandpass: ndarray of shape (N,) with wavelengths for the bandpass function
:param bandpass: ndarray of shape (N,) with corresponding bandpass values
:param lambda_spectrum: ndarray of shape (L,) with wavelengths for the measured spectrum
:param spectrum: ndarray of shape (L,) with corresponding measured spectrum values
optional arguments
:param maxiter: int - maximum number of Richardson-Lucy iterations
:returns: Shat - ndarray of shape (L,) with the estimated spectrum values
If returnAll = True:
:returns: RMS - ndarray of shape (maxiter,) with root-mean-squared values of the estimation progress
:returns: curv - ndarray of shape (maxiter,) - the curvature of RMS
"""
if display:
print '\n\n-------------------------------------------------------'
print 'Richardson-Lucy algorithm for bandpass correction'
print 'S. Eichstaedt, F Schmaehling (PTB) 2013'
print 'Usage of this software is without any warranty.'
print '\n-------------------------------------------------------'
_ = kwargs.pop("returnAll",False)
lb,b,interpolated = Interpolate2Equidist(lambda_bandpass,bandpass,display=display)[:3]
deltab = lb[1]-lb[0]
intb = np.trapz(b,dx=deltab)
if np.abs(1-intb)>1e-4:
if display:
print 'Bandpass function is not normalized...doing it now.'
b = b/intb
centroid = np.trapz(b*lb,dx=deltab)
if display:
print 'central wavelength of bandpass function is %1.4f' % (centroid)
lambda_central = np.mean(lb)
if display:
print 'central wavelength of provided wavelength scale is %1.4f' % (lambda_central)
shift = centroid - lambda_central
if shift>1e-3 and display:
print 'Suggested shift of wavelength scale is %1.4f'% (shift)
lm,M,interpolated_M = Interpolate2SameStep(spectrum,lambda_spectrum,deltab,display=display)[:3]
deltam = lm[1]-lm[0]
if display:
print 'bandpass function:'
print ' wavelength scale: [%1.3f,%1.3f]' % (lb[0],lb[-1])
print ' wavelength step: %1.4f' % (deltab)
print 'measured spectrum:'
print ' wavelength scale: [%1.3f,%1.3f]' % (lm[0],lm[-1])
print ' wavelength step: %1.4f' % (deltam)
if interpolated_M:
if display:
print('Measured spectrum required interpolation.')
print('Using spline interpolation for measurement data and wavelength\n step length of bandpass measurement.')
Shat_interp,RMS,curv = RichLucy(b,deltab,M,display=display,**kwargs)
if display:
print "RichLucy finished. Interpolating estimate back to original wavelength scale."
spline_Shat = interp1d(lm,Shat_interp,bounds_error=False,kind='linear')
Shat = spline_Shat(lambda_spectrum)
else:
Shat,RMS,curv = RichLucy(b,deltab,M,display=display,**kwargs)
Shat[np.nonzero(np.isnan(Shat))]=0.0
if returnAll:
return Shat,RMS,curv
else:
return Shat
@xl_macro("float dummy")
def __RLDUnc_macro(dummy=None):
import pyxll
import logging
_log = logging.getLogger(__name__)
try:
import win32com.client
except ImportError:
_log.warning("*** win32com.client could not be imported ***")
_log.warning("*** the RichLucy Macro is working only under Microsoft Windows ***")
xl_window = pyxll.get_active_object()
xl_app = win32com.client.Dispatch(xl_window).Application
# it's helpful to make sure the gen_py wrapper has been created
# as otherwise things like constants and event handlers won't work.
win32com.client.gencache.EnsureDispatch(xl_app)
lb = __Excel2NDArray(xl_app.Range("A2:A1000").Value)
b = __Excel2NDArray(xl_app.Range("B2:B1000").Value)
Ub = __Excel2NDArray(xl_app.Range("C2:C1000").Value)
lM = __Excel2NDArray(xl_app.Range("D2:D1000").Value)
M = __Excel2NDArray(xl_app.Range("E2:E1000").Value)
UM = __Excel2NDArray(xl_app.Range("F2:F1000").Value)
val_range = "G2:G%d" % (len(M)+1)
unc_range = "H2:H%d" % (len(M)+1)
max_it = xl_app.Range("MC_maxiter").Value
if max_it==None:
maxiter = 500
else:
maxiter = int(max_it)
MC = xl_app.Range("MCruns").Value
if MC==None:
MCruns = 100
else:
MCruns = int(MC)
xl_app.Range(val_range).Value = np.zeros((len(M),1))
xl_app.Range(unc_range).Value = np.zeros((len(M),1))
Res = RichLucyDeconvWithUncertainty(lb,b,Ub,lM,M,UM,MCruns,maxiter)
xl_app.Range(val_range).Value = np.reshape(Res[:,0],(len(M),-1))
xl_app.Range(unc_range).Value = np.reshape(Res[:,1],(len(M),-1))
@xl_func("numpy_column lambda_b, numpy_column b, numpy_column Ub, numpy_column lambda_M, numpy_column M, numpy_column M, int MCruns, int maxiter: numpy_array")
def RichLucyDeconvWithUncertainty(lb,b,Ub,lm,M,UM,MCruns=100,maxiter=500):
"""
Application of Richardson-Lucy deconvolution for provided bandpass (line spread function)
and measured spectrum. This method carries out interpolation if the sampling of bandpass
and spectrum does not meet the RichLucy assumptions. If necessary, the estimation result
is returned interpolated to the original wavelength values.
Evaluation of uncertainty associated with the estimation result is carried out using
GUM-S2 Monte Carlo.
"""
print '\n\n-------------------------------------------------------'
print 'Richardson-Lucy algorithm for bandpass correction command line tool'
print 'S. Eichstaedt, F Schmaehling (PTB) 2013'
print 'Usage of this software is without any warranty.'
print '\n-------------------------------------------------------'
print 'maximum number of iterations: %d' % maxiter
print 'Uncertainties will be calculated using %d Monte Carlo trials' % MCruns
print('-------------------------------------------------------\n')
sys.executable = "C:\Python27\python.exe"
Shat,UShat = RichLucyUncertainty(b,Ub,lb,M,UM,lm,\
runs=MCruns,maxiter=maxiter)
uShat = np.sqrt(np.diag(UShat))
result = np.vstack((Shat,uShat))
return result.transpose()
@xl_macro("string x: int")
def py_strlen(x):
"""returns the length of x"""
return len(x)
@xl_macro("numpy_column lambda_bandpass, numpy_column bandpass, numpy_column lambda_spectrum, numpy_column spectrum, int maxiter: numpy_column")
def RichLucyDeconvMacro(lambda_bandpass,bandpass,lambda_spectrum,spectrum,maxiter=500):
"""
Application of Richardson-Lucy deconvolution for provided bandpass (line spread function)
and measured spectrum. This method carries out interpolation if the sampling of bandpass
and spectrum does not meet the RichLucy assumptions. If necessary, the estimation result
is returned interpolated to the original wavelength values.
"""
print '\n\n-------------------------------------------------------'
print 'Richardson-Lucy algorithm for bandpass correction command line tool'
print 'S. Eichstaedt, F Schmaehling (PTB) 2013'
print 'Usage of this software is without any warranty.'
print '\n-------------------------------------------------------'
lb,b,interpolated = Interpolate2Equidist(lambda_bandpass,bandpass)[:3]
deltab = lb[1]-lb[0]
intb = np.trapz(b,dx=deltab)
if np.abs(1-intb)>1e-4:
print 'Bandpass function is not normalized...doing it now.'
b = b/intb
centroid = np.trapz(b*lb,dx=deltab)
print 'central wavelength of bandpass function is %1.4f' % (centroid)
lambda_central = np.mean(lb)
print 'central wavelength of provided wavelength scale is %1.4f' % (lambda_central)
shift = centroid - lambda_central
if shift>1e-3:
print 'Suggested shift of wavelength scale is %1.4f'% (shift)
lm,M,interpolated_M = Interpolate2SameStep(spectrum,lambda_spectrum,deltab)[:3]
deltam = lm[1]-lm[0]
print 'bandpass function:'
print ' wavelength scale: [%1.3f,%1.3f]' % (lb[0],lb[-1])
print ' wavelength step: %1.4f' % (deltab)
print 'measured spectrum:'
print ' wavelength scale: [%1.3f,%1.3f]' % (lm[0],lm[-1])
print ' wavelength step: %1.4f' % (deltam)
if interpolated_M:
print('Measured spectrum required interpolation.')
print('Using spline interpolation for measurement data and wavelength\n step length of bandpass measurement.')
Shat_interp = RichLucy(b,deltab,M,maxiter)[0]
print "RichLucy finished. Interpolating estimate back to original wavelength scale."
spline_Shat = interp1d(lm,Shat_interp,bounds_error=False,kind='linear')
Shat = spline_Shat(lambda_spectrum)
else:
Shat = RichLucy(b,deltab,M,maxiter)[0]
Shat[np.nonzero(np.isnan(Shat))]=0.0
return Shat
######################## demonstration of RichLucy - use from Python shell ###############
## generate simulated bandpass
[docs]def sim_bandpass(bandwidth,sampledist,skew=1,noise=0,noise_low=0):
"""
Generate a triangular bandpass function with given bandwidth
and skewnes (skew) on the wavelength axis [-bandwidth,bandwidth]
Relative measurement noise can be simulated using noise and noise_low
as maximum and minimum relative noise levels.
:param bandwidth: float - the bandwidth of the bandpass function
:param sampledist: float - the wavelength step size
:param skew: float - skewness of the triangle (left-skewed for <1.0, right-skewed for >1.0)
:param noise: float - upper limit for the relative bandpass uncertainty
:param noise_low: float - lower limit for the relative bandpass uncertainty
:returns: hat, lamb - the bandpass function and the corresponding wavelengths
"""
# definition of x-axis
lamb = np.arange(0.0,2*bandwidth+sampledist,sampledist)
ind_kl = np.nonzero(lamb < skew*bandwidth) # indices for increasing slope
ind_gr = np.nonzero(lamb >= skew*bandwidth) # indices for decreasing slope
# definition of bandpass function
hat = np.zeros_like(lamb)
hat[ind_kl] = lamb[ind_kl]/(skew*bandwidth**2)
hat[ind_gr] = -lamb[ind_gr]/( (2.0-skew)*bandwidth**2 ) + 2.0/( (2-skew)*bandwidth )
# definition of noise/measurement uncertainty
noiseslit = np.zeros_like(lamb)
noiseslit[ind_kl] = -((noise-noise_low)/(skew*bandwidth))*lamb[ind_kl] + noise
noiseslit[ind_gr] = ((noise-noise_low)/((2-skew)*bandwidth))*lamb[ind_gr] + (noise - 2*(noise-noise_low)/(2-skew))
u_hat = hat*noiseslit
epsilon = np.random.randn(len(lamb))*u_hat
hat = epsilon*u_hat+hat
lamb = lamb - bandwidth
hat[np.nonzero(hat<0)] = 0.0
return hat,lamb
## generate simulated measurement
[docs]def sim_measurement(Sfun,lamb,b,bandwidth,delta_b,noise_low=0,noise_up=0):
"""
Generate a Gaussian-shaped spectrum by simulating a measurement of a
clean Gaussian-shaped function Sfun defined on the wavelength axis lamb
with a triangular bandpass function b of given bandwidth and wavelength
step size delta_b.
Note that Sfun needs to be a function object.
Measurement noise can be simulated using the relative noise levels
noise_low and noise_up
:param Sfun: a function S(lambda) returning the spectrum at wavelengths lambda
:param lamb: ndarray of shape (N,) with wavelengths for measurement
:param b: ndarray of shape (L,) - the bandpass function values
:param bandwidth: float - the bandwidth of the bandpass function
:param delta_b: float - the wavelength step size of the bandpass function
:param noise_low: float - lower limit for the relative measurement uncertainty
:param noise_up: float - upper limit for the relative measurement uncertainty
:returns: M - ndarray of shape (N,) with simulated measured spectrum values
"""
if hasattr(Sfun,'__call__'):
# simulation of the measurement
M = np.zeros_like(lamb)
for ii in range(len(lamb)):
lambda_int = lamb[ii] + np.arange(-bandwidth,bandwidth+delta_b,delta_b)
M[ii] = np.trapz(Sfun(lambda_int)*b,lambda_int)
# add measurement noise
uncM = (-(noise_up-noise_low)*( M/np.max(M) ) + noise_up)*M
M = M + np.random.randn(len(M))*uncM
return M
else:
raise ValueError('The first input variable -Sfun- needs to be of type <function>.')
############################## demo of RL with simulated data ################
[docs]def demo(bandwidth=40,skew=1,FWHM=10,maxiter=2000):
"""
Demonstrate usage of the Richardson-Lucy method for bandpass correction.
A triangular bandpass of given bandwidth and skewness is used to simulate
a measurement of a Gaussian-shaped spectrum of given FWHM value.
The maximum number of Richardson-Lucy iterations is given by maxiter.
Two different automatic stopping criteria are employed and the results
are compared.
"""
from matplotlib.pyplot import figure,plot,clf,xlabel,ylabel,legend,show
noise_up = 2e-2 # max relative noise level of measured signal
noise_low = 3e-3 # min relative noise level of measured signal
noiseb_up = 2e-2 # max relative noise level of bandpass measurement
noiseb_low = 1e-3 # min relative noise level of bandpass measurement
delta_b = .1 # sampledist for monochromator (in nm)
delta_M = delta_b # sampledist for measurement points (in nm)
mu = 500 # mean of Gaussian pulse (simulated input spectrum)
sigma = np.sqrt(FWHM**2/(6*np.log(2)))
lambda_min = 400 # min wavelength in spectrum measurement
lambda_max = 600 # max wavelength in spectrum measurement
lambda_Mess = np.arange(lambda_min+bandwidth,lambda_max-bandwidth,delta_M)
Sfun = lambda lam: np.exp(-(lam-mu)**2/(2*sigma**2))
S = Sfun(lambda_Mess);
# generation of bandpass function
b,lambda_b = sim_bandpass(bandwidth,delta_b,skew,noiseb_up,noiseb_low)
figure (1);clf()
plot(lambda_b,b)
# simulation of the measurement
M = sim_measurement(Sfun,lambda_Mess,b,bandwidth,delta_b,noise_low,noise_up)
### plot intermediate results
figure(2);clf()
plot(lambda_Mess,S,lambda_Mess,M)
### application of Richardson-Lucy
Shat,RMS,curv = RichLucy(b,delta_b,M,maxiter,optFun=calcOptCurv)
### plot results
figure(3);clf()
plot(lambda_Mess,S,lambda_Mess,M,lambda_Mess,Shat)
xlabel('wavelength in nm')
ylabel('spectrum in a.u.')
legend(('input spectrum','measured spectrum', \
'Rich-Lucy estimate'),loc='best')
iters = np.arange(1,maxiter+1)
q = np.linspace(0.0,np.log10(maxiter),np.size(curv))
fig = figure(4);clf()
ax = fig.add_subplot(111)
ax.semilogx(iters,np.log10(RMS),10**q,curv)
ax.set_xlabel('iteration')
ax.set_ylabel('rms crit and its curv')
show()
return b,lambda_b,M,Shat,lambda_Mess
#########################################################################
############################# main function for command line use of RL
def __main():
from scipy.interpolate import interp1d
print '\n\n-------------------------------------------------------'
print 'Richardson-Lucy algorithm for bandpass correction command line tool'
print 'S. Eichstaedt, F Schmaehling (PTB) 2013'
print 'Usage of this software is without any warranty.'
print '\n-------------------------------------------------------'
if len(sys.argv)<4:
print 'Not enough input arguments! \nNeed at least [bandpass file] [measured spectrum file] and [output file]'
print 'Additional optional arguments are [maxiter] and [Monte Carlo runs]'
return
else:
bandpassfile = unicode(sys.argv[1])
measfile = unicode(sys.argv[2])
outputfile = unicode(sys.argv[3])
if len(sys.argv)>4:
maxiter = int(sys.argv[4])
if len(sys.argv)>5:
calcUnc = 1
MCruns = int(sys.argv[5])
else:
calcUnc = 0
else: # default values
maxiter = 500
calcUnc = 0
print 'RichLucy deconvolution parameters:'
print 'input files:\n ' + bandpassfile + '\n ' + measfile
print 'output file:\n ' + outputfile
print 'maximum number of iterations: %d' % maxiter
if calcUnc:
print 'Uncertainties will be calculated using %d Monte Carlo trials' % MCruns
print('-------------------------------------------------------\n')
print "---- Loading bandpass file:"
bdata = LoadFromFile(bandpassfile)
lb= bdata[:,0]
b = bdata[:,1]
if calcUnc:
m,n = bdata.shape
if n<3:
print "Bandpass data file does not contain uncertainties. Assigning zeros instead."
Ub = np.zeros_like(b)
else:
Ub = np.abs(bdata[:,2])
lb,b,interpolated = Interpolate2Equidist(lb,b)[:3]
deltab = lb[1]-lb[0]
if interpolated:
bout_interp = bandpassfile[:-4] + "_interp" + bandpassfile[-4:]
print "Saving interpolated bandpass to " + bout_interp
datei = open(bout_interp,'w')
header = "# Interpolated bandpass function\n" + "# wavelengths | interpolated bandpass \n"
datei.write(header)
Data = np.vstack((lb,b,))
Data = Data.transpose()
np.savetxt(datei,Data)
if calcUnc:
print "Note that for uncertainty evaluation the original data will be used."
intb = np.trapz(b,dx=deltab)
if np.abs(1-intb)>1e-4:
print 'Bandpass function is not normalized...doing it now.'
b = b/intb
centroid = np.trapz(b*lb,dx=deltab)
print 'central wavelength of bandpass function is %1.4f' % (centroid)
lambda_central = np.mean(lb)
print 'central wavelength of provided wavelength scale is %1.4f' % (lambda_central)
shift = centroid - lambda_central
if shift>1e-3:
print 'Suggested shift of wavelength scale is %1.4f'% (shift)
print "\n---- Loading measured spectrum file:"
mdata = LoadFromFile(measfile)
lm= mdata[:,0]
M = mdata[:,1]
if calcUnc:
m,n = mdata.shape
if n<3:
print "Spectrum data file does not contain uncertainties. Assigning zeros instead."
UM = np.zeros_like(M)
else:
UM = np.abs(mdata[:,2])
lm,M,interpolated_M = Interpolate2SameStep(M,lm,deltab)[:3]
deltam = lm[1]-lm[0]
if interpolated_M:
Mout_interp = measfile[:-4] + "_interp" + measfile[-4:]
print "Saving interpolated spectrum to " + Mout_interp
datei = open(Mout_interp,'w')
header = "# Interpolated measured spectrum\n" + "# wavelengths | interpolated bandpass \n"
datei.write(header)
Data = np.vstack((lm,M,))
Data = Data.transpose()
np.savetxt(datei,Data)
if calcUnc:
print "Note that for the uncertainty evaluation the original data will be used."
print 'Input data loaded successfully'
print 'bandpass function:'
print ' wavelength scale: [%1.3f,%1.3f]' % (lb[0],lb[-1])
print ' wavelength step: %1.4f' % (deltab)
print 'measured spectrum:'
print ' wavelength scale: [%1.3f,%1.3f]' % (lm[0],lm[-1])
print ' wavelength step: %1.4f' % (deltam)
if calcUnc:
m1,n1 = bdata.shape
m2,n2 = mdata.shape
if n1<3 and n2<3:
print "No uncertainties assigned to the input data. Skipping uncertainty evaluation."
calcUnc = False
if calcUnc:
Shat,UShat = RichLucyUncertainty(bdata[:,1],Ub,bdata[:,0],\
mdata[:,1],UM,mdata[:,0],\
runs=MCruns,maxiter=maxiter)
else:
if interpolated_M:
print('Measured spectrum required interpolation.')
print('Using spline interpolation for measurement data and wavelength\n step length of bandpass measurement.')
interp_outputfile = outputfile[:-4] + '_interp' + outputfile[-4:]
Shat_interp = RichLucy(b,deltab,M,maxiter)[0]
print "RichLucy finished. Interpolating estimate back to original wavelength scale."
spline_Shat = interp1d(lm,Shat_interp,bounds_error=False,kind='linear')
Shat = spline_Shat(mdata[:,0])
else:
Shat = RichLucy(b,deltab,M,maxiter)[0]
header1 = "# Result of Richardson-Lucy deconvolution calculated using the RichLucy command line version " + repr(version) + "\n"
if interpolated_M:
header2 = "# Estimation did require interpolation. deltab = " \
+ repr(deltab) + ", deltaM = " + repr(deltam) + "\n"\
"# Interpolation results are stored in " + interp_outputfile + "\n"
else:
header2 = ""
header3 = "# bandpass function centroid was at " + repr(centroid) \
+ " whereas central wavelength value was " + repr(lambda_central) + "\n"\
+ "# Suggested shift of wavelength scale is " + repr(shift) + "\n"
if calcUnc:
header4 = "# wavelengths | estimated spectrum | uncertainty (covariance matrix)\n"
else:
header4 = "# wavelengths | estimated spectrum\n"
header = header1 + header2 + header3 + header4
print "Saving results to file..."
datei = open(outputfile,'w')
datei.write(header)
if calcUnc:
Results = np.vstack((mdata[:,0],Shat,UShat))
else:
Results = np.vstack((mdata[:,0],Shat))
Results = Results.transpose()
np.savetxt(datei,Results)
print 'Saved result to ' + outputfile
print 'Format is: ' + header4
if interpolated_M:
datei = open(interp_outputfile,'w')
header = "# Interpolation result" + header1[8:] + header4
datei.write(header)
Data = np.vstack((lm,Shat_interp,))
Data = Data.transpose()
np.savetxt(datei,Data)
print 'Saved interpolated result to ' + interp_outputfile
print 'All done'
########################################################################
####################################################################
if __name__ == '__main__':
__main()