#!/usr/bin/env python
"""GROUPFIT.PY - Fit groups of stars in an image
"""
__authors__ = 'David Nidever <dnidever@montana.edu?'
__version__ = '20210826' # yyyymmdd
import os
import sys
import numpy as np
import scipy
import warnings
from astropy.io import fits
from astropy.table import Table
import astropy.units as u
from scipy.optimize import curve_fit, least_squares
from scipy.interpolate import interp1d
from scipy import sparse
#from astropy.nddata import CCDData,StdDevUncertainty
from dlnpyutils import utils as dln, bindata
import copy
import logging
import time
import matplotlib
import sep
from photutils.aperture import CircularAnnulus
from astropy.stats import sigma_clipped_stats
from . import leastsquares as lsq,utils
from .ccddata import CCDData,BoundingBox
# Fit a PSF model to multiple stars in an image
[docs]class GroupFitter(object):
def __init__(self,psf,image,cat,fitradius=None,verbose=False):
# Save the input values
self.verbose = verbose
self.psf = psf
self.image = image
self.cat = cat
self.nstars = np.size(cat) # number of stars
self.niter = 0 # number of iterations in the solver
self.npsfpix = psf.npix # shape of PSF
ny,nx = image.data.shape # save image dimensions, python images are (Y,X)
self.nx = nx
self.ny = ny
if fitradius is None: # PSF fitting radius
fitradius = psf.fwhm()
self.fitradius = fitradius
self.nfitpix = int(np.ceil(fitradius)) # +/- nfitpix
# Star heights
if 'height' in cat.colnames:
starheight = cat['height'].copy()
else:
# estimate height from flux and fwhm
# area under 2D Gaussian is 2*pi*A*sigx*sigy
if 'fwhm' in cat.columns:
height = cat['flux']/(2*np.pi*(cat['fwhm']/2.35)**2)
else:
height = cat['flux']/(2*np.pi*(psf.fwhm()/2.35)**2)
starheight = np.maximum(height,0) # make sure it's positive
# Initialize the parameter array
pars = np.zeros(self.nstars*3,float) # height, xcen, ycen
pars[0::3] = starheight
pars[1::3] = cat['x']
pars[2::3] = cat['y']
self.pars = pars
# Sky and Niter arrays for the stars
self.starsky = np.zeros(self.nstars,float)
self.starniter = np.zeros(self.nstars,int)
self.njaciter = 0 # initialize njaciter
# Initialize the freezepars and freezestars arrays
self.freezestars = np.zeros(self.nstars,bool)
self.freezepars = np.zeros(self.nstars*3,bool)
self.pixused = None # initialize pixused
# Get xdata, ydata
bboxdata0 = []
xlist0 = []
ylist0 = []
fbboxdata = []
fxlist = []
fylist = []
ntotpix = 0
hpsfnpix = self.psf.npix//2
for i in range(self.nstars):
xcen = self.starxcen[i]
ycen = self.starycen[i]
# Full PSF region
fbbox = psf.starbbox((xcen,ycen),image.shape,hpsfnpix)
fx,fy = psf.bbox2xy(fbbox)
frr = np.sqrt( (fx-xcen)**2 + (fy-ycen)**2 )
# Use image mask
# mask=True for bad values
if image.mask is not None:
fmask = (frr<=hpsfnpix) & (image.mask[fy,fx]==False)
else:
fmask = frr<=hpsfnpix
fx = fx[fmask] # raveled
fy = fy[fmask]
fbboxdata.append(fbbox)
fxlist.append(fx)
fylist.append(fy)
# Fitting region
bbox = psf.starbbox((xcen,ycen),image.shape,self.nfitpix)
x,y = psf.bbox2xy(bbox)
rr = np.sqrt( (x-xcen)**2 + (y-ycen)**2 )
# Use image mask
# mask=True for bad values
if image.mask is not None:
mask = (rr<=self.fitradius) & (image.mask[y,x]==False)
else:
mask = rr<=self.fitradius
x = x[mask] # raveled
y = y[mask]
ntotpix += x.size
bboxdata0.append(bbox) # this still includes the corners
xlist0.append(x)
ylist0.append(y)
self.fxlist = fxlist # full PSF region
self.fylist = fylist
self.xlist0 = xlist0 # fitting region
self.ylist0 = ylist0
self.bboxdata0 = bboxdata0
# Combine all of the X and Y values (of the pixels we are fitting) into one array
xall = np.zeros(ntotpix,int)
yall = np.zeros(ntotpix,int)
count = 0
for i in range(self.nstars):
n = len(xlist0[i])
xall[count:count+n] = xlist0[i]
yall[count:count+n] = ylist0[i]
count += n
# Create 1D unraveled indices, python images are (Y,X)
ind1 = np.ravel_multi_index((yall,xall),image.shape)
# Get unique indexes and inverse indices
# the inverse index list takes you from the duplicated pixels
# to the unique ones
uind1,invindex = np.unique(ind1,return_inverse=True)
ntotpix = len(uind1)
y,x = np.unravel_index(uind1,image.shape)
imflatten = image.data.ravel()[uind1]
errflatten = image.error.ravel()[uind1]
# Save information on the "flattened" arrays
self.ntotpix = ntotpix
self.imflatten = imflatten
self.resflatten = imflatten.copy()
self.errflatten = errflatten
self.ind1 = uind1
self.x = x
self.y = y
# We want to know for each star which pixels (that are being fit)
# are affected by it (within it's full pixel list, not just
# "its fitted pixels").
invindexlist = []
xlist = []
ylist = []
pixim = np.zeros(image.shape,bool)
for i in range(self.nstars):
fx,fy = fxlist[i],fylist[i]
# which of these are contained within the final, unique
# list of fitted pixels (X,Y)?
pixim[fy,fx] = True
used, = np.where(pixim[y,x]==True)
invindexlist.append(used)
xlist.append(x[used])
ylist.append(y[used])
pixim[fy,fx] = False # reset
self.invindexlist = invindexlist
self.xlist = xlist
self.ylist = ylist
#self.invindex = invindex # takes you from duplicates to unique pixels
#invindexlist = []
#count = 0
#for i in range(self.nstars):
# n = len(self.xlist[i])
# invindexlist.append(invindex[count:count+n])
# count += n
#self.invindexlist = invindexlist
self.bbox = BoundingBox(np.min(x),np.max(x)+1,np.min(y),np.max(y)+1)
#self.bboxdata = bboxdata
# Create initial sky image
self.sky()
@property
def starpars(self):
""" Return the [height,xcen,ycen] parameters in [Nstars,3] array.
You can GET a star's parameters like this:
pars = self.starpars[4]
You can also SET a star's parameters a similar way:
self.starpars[4] = pars
"""
return self.pars.reshape(self.nstars,3)
@property
def starheight(self):
""" Return the best-fit heights for all stars."""
return self.pars[0::3]
@starheight.setter
def starheight(self,val):
""" Set starheight values."""
self.pars[0::3] = val
@property
def starxcen(self):
""" Return the best-fit X centers for all stars."""
return self.pars[1::3]
@starxcen.setter
def starxcen(self,val):
""" Set starxcen values."""
self.pars[1::3] = val
@property
def starycen(self):
""" Return the best-fit Y centers for all stars."""
return self.pars[2::3]
@starycen.setter
def starycen(self,val):
""" Set starycen values."""
self.pars[2::3] = val
[docs] def sky(self,method='sep',rin=None,rout=None):
""" (Re)calculate the sky."""
# Remove the current best-fit model
resid = self.image.data-self.modelim # remove model
# SEP smoothly varying background
if method=='sep':
bw = np.maximum(int(self.nx/10),64)
bh = np.maximum(int(self.ny/10),64)
bkg = sep.Background(resid, mask=None, bw=bw, bh=bh, fw=3, fh=3)
self.skyim = bkg.back()
# Calculate sky value for each star
# use center position
self.starsky[:] = self.skyim[np.round(self.starycen).astype(int),np.round(self.starxcen).astype(int)]
# Annulus aperture
elif method=='annulus':
if rin is None:
rin = self.psf.fwhm()*1.5
if rout is None:
rout = self.psf.fwhm()*2.5
positions = list(zip(self.starxcen,self.starycen))
annulus = CircularAnnulus(positions,r_in=rin,r_out=rout)
for i in range(self.nstars):
annulus_mask = annulus[i].to_mask(method='center')
annulus_data = annulus_mask.multiply(resid,fill_value=np.nan)
data = annulus_data[(annulus_mask.data>0) & np.isfinite(annulus_data)]
mean_sigclip, median_sigclip, _ = sigma_clipped_stats(data,stdfunc=dln.mad)
self.starsky[i] = mean_sigclip
if hasattr(self,'skyim') is False:
self.skyim = np.zeros(self.image.shape,float)
if self.skyim is None:
self.skyim = np.zeros(self.image.shape,float)
self.skyim += np.median(self.starsky)
else:
raise ValueError("Sky method "+method+" not supported")
@property
def skyflatten(self):
""" Return the sky values for the pixels that we are fitting."""
return self.skyim.ravel()[self.ind1]
@property
def nfreezepars(self):
""" Return the number of frozen parameters."""
return np.sum(self.freezepars)
@property
def freepars(self):
""" Return the free parameters."""
return ~self.freezepars
@property
def nfreepars(self):
""" Return the number of free parameters."""
return np.sum(self.freepars)
@property
def nfreezestars(self):
""" Return the number of frozen stars."""
return np.sum(self.freezestars)
@property
def freestars(self):
""" Return the free stars."""
return ~self.freezestars
@property
def nfreestars(self):
""" Return the number of free stars."""
return np.sum(self.freestars)
[docs] def freeze(self,pars,frzpars):
""" Freeze stars and parameters"""
# PARS: best-fit values of free parameters
# FRZPARS: boolean array of which "free" parameters
# should now be frozen
# Update all the free parameters
self.pars[self.freepars] = pars
# Update freeze values for "free" parameters
self.freezepars[self.freepars] = frzpars # stick in the new values for the "free" parameters
# Check if we need to freeze any new parameters
nfrz = np.sum(frzpars)
if nfrz==0:
return pars
# Freeze new stars
oldfreezestars = self.freezestars.copy()
self.freezestars = np.sum(self.freezepars.reshape(self.nstars,3),axis=1)==3
#self.nfreezestars = np.sum(self.freezestars)
# Subtract model for newly frozen stars
newfreezestars, = np.where((oldfreezestars==False) & (self.freezestars==True))
if len(newfreezestars)>0:
# add models to a full image
newmodel = self.image.data.copy()*0
for i in newfreezestars:
# Save on what iteration this star was frozen
self.starniter[i] = self.niter+1
#print('freeze: subtracting model for star ',i)
pars1 = self.pars[i*3:(i+1)*3]
#xind = self.xlist[i]
#yind = self.ylist[i]
#invindex = self.invindexlist[i]
#im1 = self.psf(xind,yind,pars1)
#self.resflatten[invindex] -= im1
xind = self.fxlist[i]
yind = self.fylist[i]
im1 = self.psf(xind,yind,pars1)
newmodel[yind,xind] += im1
# Only keep the pixels being fit
# and subtract from the residuals
newmodel1 = newmodel.ravel()[self.ind1]
self.resflatten -= newmodel1
# Return the new array of free parameters
frzind = np.arange(len(frzpars))[frzpars]
pars = np.delete(pars,frzind)
return pars
[docs] def unfreeze(self):
""" Unfreeze all parameters and stars."""
self.freezestars = np.zeros(self.nstars,bool)
self.freezepars = np.zeros(self.nstars*3,bool)
self.resflatten = self.imflatten.copy()
[docs] def mkbounds(self,pars,imshape,xoff=10):
""" Make bounds for a set of input parameters."""
# is [height1,xcen1,ycen1,height2,xcen2,ycen2, ...]
npars = len(pars)
ny,nx = imshape
# Make bounds
lbounds = np.zeros(npars,float)
ubounds = np.zeros(npars,float)
lbounds[0::3] = 0
lbounds[1::3] = np.maximum(pars[1::3]-xoff,0)
lbounds[2::3] = np.maximum(pars[2::3]-xoff,0)
ubounds[0::3] = np.inf
ubounds[1::3] = np.maximum(pars[1::3]+xoff,nx-1)
ubounds[2::3] = np.maximum(pars[2::3]+xoff,ny-1)
bounds = (lbounds,ubounds)
return bounds
[docs] def checkbounds(self,pars,bounds):
""" Check the parameters against the bounds."""
# 0 means it's fine
# 1 means it's beyond the lower bound
# 2 means it's beyond the upper bound
npars = len(pars)
lbounds,ubounds = bounds
check = np.zeros(npars,int)
check[pars<=lbounds] = 1
check[pars>=ubounds] = 2
return check
[docs] def limbounds(self,pars,bounds):
""" Limit the parameters to the boundaries."""
lbounds,ubounds = bounds
outpars = np.minimum(np.maximum(pars,lbounds),ubounds)
return outpars
[docs] def limsteps(self,steps,maxsteps):
""" Limit the parameter steps to maximum step sizes."""
signs = np.sign(steps)
outsteps = np.minimum(np.abs(steps),maxsteps)
outsteps *= signs
return outsteps
[docs] def steps(self,pars,bounds=None,dx=0.2):
""" Return step sizes to use when fitting the stellar parameters."""
npars = len(pars)
fsteps = np.zeros(npars,float)
fsteps[0::3] = np.maximum(np.abs(pars[0::3])*0.25,1)
fsteps[1::3] = dx
fsteps[2::3] = dx
return fsteps
[docs] def newpars(self,pars,steps,bounds,maxsteps):
""" Get new parameters given initial parameters, steps and constraints."""
# Limit the steps to maxsteps
limited_steps = self.limsteps(steps,maxsteps)
# Make sure that these don't cross the boundaries
lbounds,ubounds = bounds
check = self.checkbounds(pars+limited_steps,bounds)
# Reduce step size for any parameters to go beyond the boundaries
badpars = (check!=0)
# reduce the step sizes until they are within bounds
newsteps = limited_steps.copy()
count = 0
maxiter = 2
while (np.sum(badpars)>0 and count<=maxiter):
newsteps[badpars] /= 2
newcheck = self.checkbounds(pars+newsteps,bounds)
badpars = (newcheck!=0)
count += 1
# Final parameters
newpars = pars + newsteps
# Make sure to limit them to the boundaries
check = self.checkbounds(newpars,bounds)
badpars = (check!=0)
if np.sum(badpars)>0:
# add a tiny offset so it doesn't fit right on the boundary
newpars = np.minimum(np.maximum(newpars,lbounds+1e-30),ubounds-1e-30)
return newpars
@property
def modelim(self):
""" This returns the full image of the current best model (no sky)
using the PARS values."""
im = np.zeros(self.image.shape,float)
for i in range(self.nstars):
pars = self.pars[i*3:(i+1)*3]
fxind = self.fxlist[i]
fyind = self.fylist[i]
im1 = self.psf(fxind,fyind,pars)
im[fyind,fxind] += im1
return im
@property
def modelflatten(self):
""" This returns the current best model (no sky) for only the "flatten" pixels
using the PARS values."""
return self.model(np.arange(10),*self.pars,allparams=True,verbose=False)
[docs] def model(self,x,*args,trim=False,allparams=False,verbose=None):
""" Calculate the model for the pixels we are fitting."""
# ALLPARAMS: all of the parameters were input
if verbose is None and self.verbose:
print('model: ',self.niter,args)
# Args are [height,xcen,ycen,sky] for all Nstars
# so 3*Nstars parameters
psf = self.psf
# Figure out the parameters of ALL the stars
# some stars and parameters are FROZEN
if self.nfreezepars>0 and allparams is False:
allpars = self.pars
allpars[self.freepars] = args
else:
allpars = args
x0,x1 = self.bbox.xrange
y0,y1 = self.bbox.yrange
nx = x1-x0-1
ny = y1-y0-1
#im = np.zeros((nx,ny),float) # image covered by star
allim = np.zeros(self.ntotpix,float)
usepix = np.zeros(self.ntotpix,bool)
# Loop over the stars and generate the model image
# ONLY LOOP OVER UNFROZEN STARS
if allparams is False:
dostars = np.arange(self.nstars)[self.freestars]
else:
dostars = np.arange(self.nstars)
for i in dostars:
pars = allpars[i*3:(i+1)*3]
xind = self.xlist[i]
yind = self.ylist[i]
invindex = self.invindexlist[i]
im1 = psf(xind,yind,pars)
allim[invindex] += im1
usepix[invindex] = True
self.usepix = usepix
nusepix = np.sum(usepix)
if trim and nusepix<self.ntotpix:
unused = np.arange(self.ntotpix)[~usepix]
allim = np.delete(allim,unused)
self.niter += 1
return allim
[docs] def jac(self,x,*args,retmodel=False,trim=False,allparams=False,verbose=None):
""" Calculate the jacobian for the pixels and parameters we are fitting"""
if verbose is None and self.verbose:
print('jac: ',self.njaciter,args)
# Args are [height,xcen,ycen,sky] for all Nstars
# so 3*Nstars parameters
psf = self.psf
# Figure out the parameters of ALL the stars
# some stars and parameters are FROZEN
if self.nfreezepars>0 and allparams is False:
allpars = self.pars
if len(args) != (len(self.pars)-self.nfreezepars):
print('problem')
import pdb; pdb.set_trace()
allpars[self.freepars] = args
else:
allpars = args
x0,x1 = self.bbox.xrange
y0,y1 = self.bbox.yrange
nx = x1-x0-1
ny = y1-y0-1
#jac = np.zeros((nx,ny,len(args)),float) # image covered by star
jac = np.zeros((self.ntotpix,len(self.pars)),float) # image covered by star
usepix = np.zeros(self.ntotpix,bool)
if retmodel:
im = np.zeros(self.ntotpix,float)
# Loop over the stars and generate the model image
# ONLY LOOP OVER UNFROZEN STARS
if allparams is False:
dostars = np.arange(self.nstars)[self.freestars]
else:
dostars = np.arange(self.nstars)
for i in dostars:
pars = allpars[i*3:(i+1)*3]
#bbox = self.bboxdata[i]
xind = self.xlist[i]
yind = self.ylist[i]
invindex = self.invindexlist[i]
xdata = (xind,yind)
if retmodel:
m,jac1 = psf.jac(xdata,*pars,retmodel=True)
else:
jac1 = psf.jac(xdata,*pars)
jac[invindex,i*3] = jac1[:,0]
jac[invindex,i*3+1] = jac1[:,1]
jac[invindex,i*3+2] = jac1[:,2]
#jac[invindex,i*4+3] = jac1[:,3]
if retmodel:
im[invindex] += m
usepix[invindex] = True
# Remove frozen columns
if self.nfreezepars>0 and allparams is False:
jac = np.delete(jac,np.arange(len(self.pars))[self.freezepars],axis=1)
self.usepix = usepix
nusepix = np.sum(usepix)
# Trim out unused pixels
if trim and nusepix<self.ntotpix:
unused = np.arange(self.ntotpix)[~usepix]
jac = np.delete(jac,unused,axis=0)
if retmodel:
im = np.delete(im,unused)
self.njaciter += 1
if retmodel:
return im,jac
else:
return jac
[docs] def heightfit(self,trim=True):
""" Fit the heights only for the stars."""
# linear least squares problem
# Ax = b
# A is the set of models pixel values for height, [Npix, Nstar]
# x is heights [Nstar] we are solving for
# b is pixel values, oru residflatten values
# All parameters
allpars = self.pars
A = np.zeros((self.ntotpix,self.nstars),float)
usepix = np.zeros(self.ntotpix,bool)
# Loop over the stars and generate the model image
# ONLY LOOP OVER UNFROZEN STARS
dostars = np.arange(self.nstars)[self.freestars]
guess = np.zeros(self.nfreestars,float)
for count,i in enumerate(dostars):
pars = allpars[i*3:(i+1)*3].copy()
guess[count] = pars[0]
pars[0] = 1.0 # unit height
xind = self.xlist[i]
yind = self.ylist[i]
invindex = self.invindexlist[i]
im1 = self.psf(xind,yind,pars)
A[invindex,i] = im1
usepix[invindex] = True
nusepix = np.sum(usepix)
# Residual data
dy = self.resflatten-self.skyflatten
if trim and nusepix<self.ntotpix:
unused = np.arange(self.ntotpix)[~usepix]
A = np.delete(A,unused,axis=0)
dy = dy[usepix]
from scipy import sparse
A = sparse.csc_matrix(A) # make sparse
# Use guess to get close to the solution
dy2 = dy - A.dot(guess)
par = sparse.linalg.lsqr(A,dy2,atol=1e-4,btol=1e-4)
dheight = par[0]
height = guess+dheight
# preconditioning!
return height
[docs] def centroid(self):
""" Centroid all of the stars."""
# Start with the residual image and all stars subtracted.
# Then add in the model of one star at a time and centroid it with the best-fitting model.
# All parameters
allpars = self.pars
resid = self.image.data.copy()-self.skyim
# Generate full models
# Loop over the stars and generate the model image
# ONLY LOOP OVER UNFROZEN STARS
fmodels = []
#fjac = []
models = []
jac = []
dostars = np.arange(self.nstars)[self.freestars]
usepix = np.zeros(self.ntotpix,bool)
for count,i in enumerate(dostars):
pars = self.starpars[i]
#pars = allpars[i*3:(i+1)*3]
# Full models
fxind = self.fxlist[i]
fyind = self.fylist[i]
fim1 = self.psf(fxind,fyind,pars)
resid[fyind,fxind] -= fim1
fmodels.append(fim1)
#fjac.append(fjac1)
#fusepix[finvindex] = True
# Only fitting models
xind = self.xlist[i]
yind = self.ylist[i]
invindex = self.invindexlist[i]
im1,jac1 = self.psf.jac((xind,yind),*pars,retmodel=True)
models.append(im1)
jac.append(jac1)
#usepix[invindex] = True
# Loop over all free stars and fit centroid
xnew = np.zeros(self.nfreestars,float)
ynew = np.zeros(self.nfreestars,float)
for count,i in enumerate(dostars):
pars = self.starpars[i]
freezepars = self.freezepars[i*3:(i+1)*3]
xnew[count] = pars[1]
ynew[count] = pars[2]
# Both x/y frozen
if freezepars[1]==True and freezepars[2]==True:
continue
# Add the model for this star back in
fxind = self.fxlist[i]
fyind = self.fylist[i]
fmodel1 = fmodels[count]
#resid[fyind,fxind] += fmodel1
# crowdsource, does a sum
# y = 2 / integral(P*P*W) * integral(x*(I-P)*W)
# where x = x/y coordinate, I = isolated stamp, P = PSF model, W = weight
# Use the derivatives instead
xind = self.xlist[i]
yind = self.ylist[i]
jac1 = jac[count]
jac1 = np.delete(jac1,0,axis=1) # delete height column
resid1 = resid[yind,xind]
# CHOLESKY_JAC_SOLVE NEEDS THE ACTUAL RESIDUALS!!!!
# with the star removed!!
# Use cholesky to solve
# If only one position frozen, solve for the free one
# X frozen, solve for Y
if freezepars[1]==True:
jac1 = np.delete(jac1,0,axis=1)
dbeta = cholesky_jac_solve(jac1,resid1)
ynew[count] += dbeta[0]
# Y frozen, solve for X
elif freezepars[2]==True:
jac1 = np.delete(jac1,1,axis=1)
dbeta = cholesky_jac_solve(jac1,resid1)
xnew[count] += dbeta[0]
# Solve for both X and Y
else:
dbeta = cholesky_jac_solve(jac1,resid1)
xnew[count] += dbeta[0]
ynew[count] += dbeta[1]
# Remove the star again
#resid[fxind,fyind] -= fmodel1
return xnew,ynew
[docs] def cov(self):
""" Determine the covariance matrix."""
# https://stats.stackexchange.com/questions/93316/parameter-uncertainty-after-non-linear-least-squares-estimation
# more background here, too: http://ceres-solver.org/nnls_covariance.html
xdata = np.arange(self.ntotpix)
# Hessian = J.T * T, Hessian Matrix
# higher order terms are assumed to be small
# https://www8.cs.umu.se/kurser/5DA001/HT07/lectures/lsq-handouts.pdf
mjac = self.jac(xdata,*self.pars,allparams=True,trim=False,verbose=False)
# Weights
# If weighted least-squares then
# J.T * W * J
# where W = I/sig_i**2
wt = np.diag(1/self.errflatten**2)
hess = mjac.T @ (wt @ mjac)
#hess = mjac.T @ mjac # not weighted
# cov = H-1, covariance matrix is inverse of Hessian matrix
cov_orig = lsq.inverse(hess)
# Rescale to get an unbiased estimate
# cov_scaled = cov * (RSS/(m-n)), where m=number of measurements, n=number of parameters
# RSS = residual sum of squares
# using rss gives values consistent with what curve_fit returns
bestmodel = self.model(xdata,*self.pars,allparams=True,trim=False,verbose=False)
resid = self.imflatten-self.skyflatten-bestmodel
#cov = cov_orig * (np.sum(resid**2)/(self.ntotpix-len(self.pars)))
# Use chi-squared, since we are doing the weighted least-squares and weighted Hessian
chisq = np.sum(resid**2/self.errflatten**2)
cov = cov_orig * (chisq/(self.ntotpix-len(self.pars))) # what MPFITFUN suggests, but very small
# cov = lqr.jac_covariange(mjac,resid,wt)
return cov
[docs]def fit(psf,image,cat,method='qr',fitradius=None,recenter=True,maxiter=10,minpercdiff=0.5,
reskyiter=2,nofreeze=False,absolute=False,verbose=False):
"""
Fit PSF to group of stars in an image.
Parameters
----------
psf : PSF object
PSF object with initial parameters to use.
image : CCDData object
Image to use to fit PSF model to stars.
cat : table
Catalog with initial height/x/y values for the stars to use to fit the PSF.
method : str, optional
Method to use for solving the non-linear least squares problem: "cholesky",
"qr", "svd", and "curve_fit". Default is "cholesky".
fitradius: float, optional
The fitting radius in pixels. By default the PSF FWHM is used.
recenter : boolean, optional
Allow the centroids to be fit. Default is True.
maxiter : int, optional
Maximum number of iterations to allow. Only for methods "cholesky", "qr" or "svd".
Default is 10.
minpercdiff : float, optional
Minimum percent change in the parameters to allow until the solution is
considered converged and the iteration loop is stopped. Only for methods
"cholesky", "qr" and "svd". Default is 0.5.
reskyiter : int, optional
After how many iterations to re-calculate the sky background. Default is 2.
absolute : boolean, optional
Input and output coordinates are in "absolute" values using the image bounding box.
Default is False, everything is relative.
nofreeze : boolean, optional
Do not freeze any parameters even if they have converged. Default is False.
verbose : boolean, optional
Verbose output.
Returns
-------
out : table
Table of best-fitting parameters for each star.
id, height, height_error, x, x_err, y, y_err, sky
model : numpy array
Best-fitting model of the stars and sky background.
sky : numpy array
Best-fitting sky image.
Example
-------
outcat,model,sky = fit(psf,image,cat)
"""
start = time.time()
print = utils.getprintfunc() # Get print function to be used locally, allows for easy logging
# Check input catalog
for n in ['x','y']:
if n not in cat.keys():
raise ValueError('Cat must have x and y columns')
# Check the method
method = str(method).lower()
if method not in ['cholesky','svd','qr','sparse','htcen','curve_fit']:
raise ValueError('Only cholesky, svd, qr, sparse, htcen or curve_fit methods currently supported')
# Make sure image is CCDData
if isinstance(image,CCDData) is False:
image = CCDData(image)
nstars = np.array(cat).size
# Image offsets
if absolute:
imx0 = image.bbox.xrange[0]
imy0 = image.bbox.yrange[0]
cat['x'] -= imx0
cat['y'] -= imy0
# Start the Group Fitter
gf = GroupFitter(psf,image,cat,fitradius=fitradius,verbose=(verbose>=2))
xdata = np.arange(gf.ntotpix)
# Centroids fixed
if recenter==False:
gf.freezepars[1::3] = True # freeze X values
gf.freezepars[2::3] = True # freeze Y values
# Perform the fitting
#--------------------
# Initial estimates
initpar = np.zeros(nstars*3,float)
initpar[0::3] = gf.starheight
initpar[1::3] = cat['x']
initpar[2::3] = cat['y']
# Make bounds
# this requires all 3*Nstars parameters to be input
bounds = gf.mkbounds(initpar,image.shape)
# Curve_fit
# dealt with separately
if method=='curve_fit':
# Perform the fitting
bounds = [np.zeros(gf.nstars*3,float)-np.inf,
np.zeros(gf.nstars*3,float)+np.inf]
bounds[0][0::3] = 0
if recenter:
bounds[0][1::3] = cat['x']-2
bounds[1][1::3] = cat['x']+2
bounds[0][2::3] = cat['y']-2
bounds[1][2::3] = cat['y']+2
else:
bounds[0][1::3] = cat['x']-1e-7
bounds[1][1::3] = cat['x']+1e-7
bounds[0][2::3] = cat['y']-1e-7
bounds[1][2::3] = cat['y']+1e-7
bestpar,cov = curve_fit(gf.model,xdata,gf.imflatten-gf.skyflatten,bounds=bounds,
sigma=gf.errflatten,p0=initpar,jac=gf.jac)
bestmodel = gf.model(xdata,*bestpar)
perror = np.sqrt(np.diag(cov))
chisq = np.sum((gf.imflatten-gf.skyflatten-bestmodel)**2/gf.errflatten**2)
gf.pars = bestpar
gf.chisq = chisq
# All other fitting methods
else:
# Iterate
bestpar_all = initpar.copy()
# centroids fixed, only keep heights
if recenter==False:
initpar = initpar[0::3]
gf.niter = 0
maxpercdiff = 1e10
bestpar = initpar.copy()
npars = len(bestpar)
maxsteps = gf.steps(gf.pars,bounds) # maximum steps, requires all 3*Nstars parameters
while (gf.niter<maxiter and maxpercdiff>minpercdiff):
start0 = time.time()
# Jacobian solvers
if method != 'htcen':
# Get the Jacobian and model
# only for pixels that are affected by the "free" parameters
m,jac = gf.jac(xdata,*bestpar,retmodel=True,trim=True)
# Residuals
dy = gf.resflatten[gf.usepix]-gf.skyflatten[gf.usepix]-m
# Weights
wt = 1/gf.errflatten[gf.usepix]**2
# Solve Jacobian
dbeta_free = lsq.jac_solve(jac,dy,method=method,weight=wt)
dbeta_free[~np.isfinite(dbeta_free)] = 0.0 # deal with NaNs, shouldn't happen
dbeta = np.zeros(len(gf.pars),float)
dbeta[gf.freepars] = dbeta_free
# htcen, crowdsource method of solving heights/fluxes first
# and then centroiding to get x/y
else:
dbeta = np.zeros(3*gf.nfreestars,float)
# Solve for the heights
newht = gf.heightfit()
dbeta[0::3] = gf.starheight[gf.freestars] - newht
gf.starheight[gf.freestars] = newht # update the heights
# Solve for the positions
newx, newy = gf.centroid()
dbeta[1::3] = gf.starxcen[~gf.freezestars] - newx
dbeta[2::3] = gf.starycen[~gf.freezestars] - newy
gf.starxcen[gf.freestars] = newx
gf.starycen[gf.freestars] = newy
# trim frozen parameters from free stars
freestars = np.arange(gf.nstars)[gf.freestars]
freepars = np.zeros(3*gf.nstars,bool)
for count,i in enumerate(freestars):
freepars1 = gf.freepars[i*3:(i+1)*3]
freepars[count*3:(count+1)*3] = freepars1
dbeta_free = dbeta[freepars]
# Update parameters
oldpar = bestpar.copy()
oldpar_all = bestpar_all.copy()
bestpar_all = gf.newpars(gf.pars,dbeta,bounds,maxsteps)
bestpar = bestpar_all[gf.freepars]
# Check differences and changes
diff_all = np.abs(bestpar_all-oldpar_all)
percdiff_all = diff_all.copy()*0
percdiff_all[0::3] = diff_all[0::3]/np.maximum(oldpar_all[0::3],0.0001)*100 # height
percdiff_all[1::3] = diff_all[1::3]*100 # x
percdiff_all[2::3] = diff_all[2::3]*100 # y
diff = diff_all[gf.freepars]
percdiff = percdiff_all[gf.freepars]
# Freeze parameters/stars that converged
# also subtract models of fixed stars
# also return new free parameters
if not nofreeze:
frzpars = percdiff<=minpercdiff
freeparsind, = np.where(~gf.freezepars)
bestpar = gf.freeze(bestpar,frzpars)
npar = len(bestpar)
if verbose:
print('Nfrozen pars = '+str(gf.nfreezepars))
print('Nfrozen stars = '+str(gf.nfreezestars))
print('Nfree pars = '+str(npar))
else:
gf.pars = bestpar
maxpercdiff = np.max(percdiff)
# Get model and chisq
if method != 'curve_fit':
bestmodel = gf.model(xdata,*gf.pars,allparams=True)
resid = gf.imflatten-gf.skyflatten-bestmodel
chisq = np.sum(resid**2/gf.errflatten**2)
gf.chisq = chisq
if verbose:
print('Iter = '+str(gf.niter))
print('Pars = '+str(gf.pars))
print('Percent diff = '+str(percdiff))
print('Diff = '+str(diff))
print('chisq = '+str(chisq))
# Re-estimate the sky
if gf.niter % reskyiter == 0:
print('Re-estimating the sky')
gf.sky()
if verbose:
print('iter dt = %.2f sec' % (time.time()-start0))
gf.niter += 1 # increment counter
bad, = np.where((gf.pars[0::3]>1e10) | (gf.pars[1::3]<-1) | (gf.pars[1::3]>3390) | (gf.pars[2::3]<-1) | (gf.pars[2::3]>2710))
if len(bad)>0:
import pdb; pdb.set_trace()
# Check that all starniter are set properly
# if we stopped "prematurely" then not all stars were frozen
# and didn't have starniter set
gf.starniter[gf.starniter==0] = gf.niter
# Make final model
gf.unfreeze()
model = CCDData(gf.modelim,bbox=image.bbox,unit=image.unit)
# Estimate uncertainties
if method != 'curve_fit':
# Calculate covariance matrix
cov = gf.cov()
perror = np.sqrt(np.diag(cov))
pars = gf.pars
if verbose:
print('Best-fitting parameters: '+str(pars))
print('Errors: '+str(perror))
# Put in catalog
# Initialize catalog
dt = np.dtype([('id',int),('height',float),('height_error',float),('x',float),
('x_error',float),('y',float),('y_error',float),('sky',float),
('flux',float),('flux_error',float),('mag',float),('mag_error',float),
('rms',float),('chisq',float),('niter',int)])
outcat = np.zeros(nstars,dtype=dt)
if 'id' in cat.keys():
outcat['id'] = cat['id']
else:
outcat['id'] = np.arange(nstars)+1
outcat['height'] = pars[0::3]
outcat['height_error'] = perror[0::3]
outcat['x'] = pars[1::3]
outcat['x_error'] = perror[1::3]
outcat['y'] = pars[2::3]
outcat['y_error'] = perror[2::3]
outcat['sky'] = gf.starsky
outcat['flux'] = outcat['height']*psf.fwhm()
outcat['flux_error'] = outcat['height_error']*psf.fwhm()
outcat['mag'] = -2.5*np.log10(np.maximum(outcat['flux'],1e-10))+25.0
outcat['mag_error'] = (2.5/np.log(10))*outcat['flux_error']/outcat['flux']
outcat['niter'] = gf.starniter # what iteration it converged on
outcat = Table(outcat)
# Relculate chi-squared and RMS of fit
for i in range(nstars):
xlist = gf.xlist0[i]
ylist = gf.ylist0[i]
flux = image.data[ylist,xlist].copy()
err = image.error[ylist,xlist]
xdata = (xlist,ylist)
model1 = psf(xlist,ylist,pars=[outcat['height'][i],outcat['x'][i],outcat['y'][i]])
chisq = np.sum((flux-outcat['sky'][i]-model1.ravel())**2/err**2)/len(xlist)
outcat['chisq'][i] = chisq
# chi value, RMS of the residuals as a fraction of the height
rms = np.sqrt(np.mean(((flux-outcat['sky'][i]-model1.ravel())/outcat['height'][i])**2))
outcat['rms'][i] = rms
# Image offsets for absolute X/Y coordinates
if absolute:
outcat['x'] += imx0
outcat['y'] += imy0
cat['x'] += imx0
cat['y'] += imy0
if verbose:
print('dt = %.2f sec' % (time.time()-start))
return outcat,model,gf.skyim