#!/usr/bin/env python
"""CCDDATA.PY - Thin wrapper around CCDData class
"""
__authors__ = 'David Nidever <dnidever@montana.edu?'
__version__ = '20210908' # yyyymmdd
import sys
import time
import numpy as np
from astropy.nddata import CCDData as CCD,StdDevUncertainty
from astropy.wcs import WCS
from astropy.io import fits
from photutils.aperture import BoundingBox as BBox
from copy import deepcopy
from dlnpyutils import utils as dln
from . import sky as psky
[docs]def poissonnoise(data,gain=1.0,rdnoise=0.0):
""" Generate Poisson noise model (ala DAOPHOT)."""
# gain
# rdnoise
noise = np.sqrt(data/gain + rdnoise**2)
return noise
[docs]def getgain(image):
""" Get the gain from the header."""
gain = 1.0 # default
# check if there's a header
if hasattr(image,'meta'):
if image.meta is not None:
# Try all versions of gain
for f in ['gain','egain','gaina']:
hgain = image.meta.get(f)
if hgain is not None:
gain = hgain
break
return gain
[docs]def getrdnoise(image):
" Get the read noise from the header."""
rdnoise = 0.0 # default
# check if there's a header
if hasattr(image,'meta'):
if image.meta is not None:
# Try all versions of rdnoise
for f in ['rdnoise','readnois','enoise','rdnoisea']:
hrdnoise = image.meta.get(f)
if hrdnoise is not None:
rdnoise = hrdnoise
break
return rdnoise
[docs]class CCDData(CCD):
def __init__(self, data, *args, error=None, bbox=None, gain=None, rdnoise=None, sky=None,
copy=False, skyfunc=None, unit=None, **kwargs):
# Make sure the original version copies all of the input data
# otherwise bad things will happen when we convert to native byte-order
# Pull out error from arguments
if len(args)>0:
error = args[0]
if len(args)==1:
args = ()
else:
args = (None,*args[1:])
# Make sure we have units
if unit is None:
unit = 'adu'
# Initialize with the parent...
super().__init__(data, *args, copy=copy, unit=unit, **kwargs)
# Error
self._error = error
# Sky
self._sky = sky
# Sky estimation function
if skyfunc is not None:
self._skyfunc = skyfunc
else:
self._skyfunc = psky.sepsky
# Gain
self._gain = gain
# Read noise
self._rdnoise = rdnoise
# Copy
if copy:
if self._gain is not None:
self._gain = deepcopy(self._gain)
if self._rdnoise is not None:
self._rdnoise = deepcopy(self._rdnoise)
if self._error is not None:
self._error = deepcopy(self._error)
if self._sky is not None:
self._sky = deepcopy(self._sky)
self._skyfunc = deepcopy(self._skyfunc)
ndim = self.data.ndim
if ndim==0:
if bbox is None: bbox=BoundingBox(0,0,0,0)
self._bbox = bbox
self._x = None
self._y = None
elif ndim==1:
nx, = self.data.shape
if bbox is None: bbox=BoundingBox(0,nx,0,0)
self._bbox = bbox
self._x = np.arange(bbox.xrange[0],bbox.xrange[-1])
self._y = None
elif ndim==2:
ny,nx = self.data.shape
if bbox is None: bbox=BoundingBox(0,nx,0,ny)
self._bbox = bbox
self._x = np.arange(bbox.xrange[0],bbox.xrange[-1])
self._y = np.arange(bbox.yrange[0],bbox.yrange[-1])
else:
raise ValueError('3D CCDData not supported')
#self.native()
# for sep we need to ensure that the data is "c-contiguous"
# if we used a slice with no copy, then it won't be
# check image.data.flags['C_CONTIGUOUS']
# can make it c-contiguous by doing
# foo = foo.copy(order='C')
#maybe have CCDData have both a relative and absolute BoundingBox. The relative being of the
#last image that it was sliced from (what I'm using now), and the absolute one keeping track of
#the position in the *original* image.
def __repr__(self):
prefix = self.__class__.__name__ + '('
body = np.array2string(self.data, separator=', ', prefix=prefix)
out = ''.join([prefix, body, ')']) +'\n'
out += self.bbox.__repr__()
return out
# for the string representation also print out the bbox values
def __getitem__(self, item):
# Abort slicing if the data is a single scalar.
if self.data.shape == ():
raise TypeError('scalars cannot be sliced.')
# Single slice or integer
# make sure we have values for each dimension
if self.ndim==2 and type(item) is not tuple:
item = (item,slice(None,None,None))
# Let the other methods handle slicing.
kwargs = self._slice(item)
new = self.__class__(**kwargs)
# Deal with error
if self._error is not None:
new._error = self._error[item]
else:
new._error = None
# Deal with Sky
if self._sky is not None:
new._sky = self._sky[item]
else:
new._sky = None
# Gain and rdnoise
if self._gain is not None:
new._gain = deepcopy(self._gain)
if self._rdnoise is not None:
new._rdnoise = deepcopy(self._rdnoise)
# Get number of starting values and number of output elements
# 1-D
if self.ndim==1:
nx, = self.data.shape
# slice object
if isinstance(item,slice):
start1,stop1,step1 = item1.indices(nx)
nel = stop1-start1
start = start1
# Integer
else:
nel = 0
start = item
newx = self._x[item].copy()
if nel==0: # 0-D
new._x = np.array(newx)
return new
new._bbox = BoundingBox(newx[0],newx[-1]+1,0,0)
new._x = newx
new._y = None
# 2-D
elif self.ndim==2:
shape = self.data.shape
nel = np.zeros(2,int)
start = np.zeros(2,int)
for i in range(2):
item1 = item[i]
# Slice object
if isinstance(item1,slice):
start1,stop1,step1 = item1.indices(shape[i])
nel[i] = stop1-start1
start[i] = start1
# Integer
else:
nel[i] = 0
start[i] = item1
# python images are (Y,X)
newx = self._x[item[1]].copy()
newy = self._y[item[0]].copy()
# Deal with various output types
# 0-D output
if np.sum(nel)==0:
new._x = newx
new._y = newy
return new
# 1-D output
elif np.min(nel)==0:
rem, = np.where(nel==0) # which dimension got removed
if rem[0]==0:
new._bbox = BoundingBox(newy[0],newy[-1]+1,0,0)
new._x = newy
new._y = None
else:
new._bbox = BoundingBox(newx[0],newx[-1]+1,0,0)
new._x = newx
new._y = None
# 2-D output
else:
try:
new._bbox = BoundingBox(newx[0],newx[-1]+1,newy[0],newy[-1]+1)
except:
print('slicing problem')
import pdb; pdb.set_trace()
new._x = newx
new._y = newy
else:
raise ValueError('3D CCDData not supported')
return new
@property
def error(self):
""" Return the uncertainty."""
# if error not input
# estimate error from image plus gain
if self._error is None:
self._error = poissonnoise(self.data,self.gain,self.rdnoise)
return self._error
@property
def sky(self):
""" Return the sky."""
# estimate the sky
if self._sky is None:
self._sky = self._skyfunc(self)
return self._sky
@property
def gain(self):
""" Return the gain."""
if self._gain is None:
self._gain = getgain(self)
return self._gain
@property
def rdnoise(self):
""" Return the read noise."""
if self._rdnoise is None:
self._rdnoise = getrdnoise(self)
return self._rdnoise
@property
def bbox(self):
""" Boundary box."""
# Upper values are EXCLUSIVE as is normal in python!
return self._bbox
@property
def x(self):
""" X-array."""
return self._x
@property
def y(self):
""" Y-array."""
return self._y
@property
def min(self):
""" Calculate the min of the image data. Uses only unmasked data."""
if self.mask is not None:
return np.min(self.data[~self.mask])
else:
return np.min(self.data)
@property
def max(self):
""" Calculate the max of the image data. Uses only unmasked data."""
if self.mask is not None:
return np.max(self.data[~self.mask])
else:
return np.max(self.data)
@property
def mean(self):
""" Calculate the mean of the image data. Uses only unmasked data."""
if self.mask is not None:
return np.mean(self.data[~self.mask])
else:
return np.mean(self.data)
@property
def median(self):
""" Calculate the median of the image data. Uses only unmasked data."""
if self.mask is not None:
return np.median(self.data[~self.mask])
else:
return np.median(self.data)
@property
def std(self):
""" Calculate the standard deviation of the image data. Uses only unmasked data."""
if self.mask is not None:
return np.std(self.data[~self.mask])
else:
return np.std(self.data)
@property
def mad(self):
""" Calculate the MAD of the image data. Uses only unmasked data."""
if self.mask is not None:
return dln.mad(self.data[~self.mask])
else:
return dln.mad(self.data)
[docs] def isnative(self,data):
""" Check if data has native byte order."""
sys_is_le = sys.byteorder == 'little'
native_code = sys_is_le and '<' or '>'
# = is native
# | is for not applicable
return (data.dtype.byteorder == native_code) or (data.dtype.byteorder=='=') or (data.dtype.byteorder=='|')
[docs] def isccont(self,data):
""" Check if data is c-continuous."""
return data.flags['C_CONTIGUOUS']
[docs] def issepready(self,data):
""" Check if data is ready for sep (native byte order and c-continuous)."""
return self.isnative(data) & self.isccont(data)
[docs] def native(self):
""" Make sure that the arrays use native endian for sep."""
# Deal with byte order for sep
sys_is_le = sys.byteorder == 'little'
native_code = sys_is_le and '<' or '>'
# data
if self.data.dtype.byteorder != native_code:
self.data = self.data.byteswap(inplace=True).newbyteorder()
# error
if self._error is not None:
if self._error.dtype.byteorder != native_code:
self._error = self._error.byteswap(inplace=True).newbyteorder()
# mask
if self.mask is not None:
if self.mask.dtype.byteorder != native_code:
self.mask = self.mask.byteswap(inplace=True).newbyteorder()
# sky
if self._sky is not None:
if self._sky.dtype.byteorder != native_code:
self._sky = self._sky.byteswap(inplace=True).newbyteorder()
@property
def ccont(self):
""" Return C-Continuous data for data, error, mask, sky."""
return (self.sepready(self.data), self.sepready(self.error),
self.sepready(self.mask), self.sepready(self.sky))
#if self.data.flags['C_CONTIGUOUS']==False:
# data = self.data.copy(order='C')
#else:
# data = self.data
#if self.error.flags['C_CONTIGUOUS']==False:
# error = self.error.copy(order='C')
#else:
# error = self.error
#if self.mask is not None:
# if self.mask.flags['C_CONTIGUOUS']==False:
# mask = self.mask.copy(order='C')
# else:
# mask = self.mask
#else:
# mask = self.mask
#if self.sky.flags['C_CONTIGUOUS']==False:
# sky = self.sky.copy(order='C')
#else:
# sky = self.sky
return (data,error,mask,sky)
[docs] def sepready(self,data=None):
""" Return sep-ready data (native byte order and c-continuous)."""
# No data nput, return a sep-ready version of the
# the image object
if data is None:
# Check all data arrays
ready = True
for n in ['data','_error','mask','_sky']:
dat = getattr(self,n)
if dat is not None:
ready &= self.sepready(dat)
# Already sep-ready
if ready:
return self
# Not ready, get sep-ready versions of the data
new = self.copy()
for n in ['data','_error','mask','_sky']:
dat = getattr(new,n)
if dat is not None:
setattr(new,n,new.sepready(dat))
return new
# Data array input
else:
if self.issepready(data)==False:
new = data.copy(order='C')
if self.isnative(new)==False:
new = new.byteswap(inplace=True).newbyteorder()
return new
#return data.copy(order='C').byteswap(inplace=True).newbyteorder()
else:
return data
@property
def sepdata(self):
""" Return C-Continuous and native byte order data for sep."""
# Deal with byte order for sep
sys_is_le = sys.byteorder == 'little'
native_code = sys_is_le and '<' or '>'
# Loop over data types
out = []
for name in ['data','error','mask','_sky']:
data = getattr(self,name)
if data is not None:
# if not c-contiguous or not native byte order, copy + correct
if data.flags['C_CONTIGUOUS']==False or data.dtype.byteorder!=native_code:
out.append(data.copy(order='C').byteswap(inplace=True).newbyteorder())
else:
out.append(data)
else:
out.append(data)
return tuple(out)
[docs] def copy(self):
"""
Return a copy of the CCDData object.
"""
return self.__class__(self, copy=True, error=self._error, gain=self._gain, rdnoise=self._rdnoise)
[docs] def write(self,outfile,overwrite=True):
"""
Write the image data to a file.
Parameters
----------
outfile : str
The output filename.
overwrite : boolean, optional
Overwrite the file if it already exists. Default is True.
Example
-------
image.write('image.fits',overwrite=True)
"""
hdulist = fits.HDUList()
# HDU0: Data and header
hdulist.append(fits.PrimaryHDU(self.data,self.header))
hdulist[0].header['IMAGTYPE'] = 'Prometheus'
# HDU1: error
hdulist.append(fits.ImageHDU(self.error))
hdulist[1].header['BUNIT'] = 'Uncertainty'
# HDU2: mask
if self.mask is None:
hdulist.append(fits.ImageHDU(self.mask))
else:
hdulist.append(fits.ImageHDU(self.mask.astype(int)))
hdulist[2].header['BUNIT'] = 'Mask'
# HDU3: flags
hdulist.append(fits.ImageHDU(self.flags))
hdulist[3].header['BUNIT'] = 'Flags'
# HDU4: sky
hdulist.append(fits.ImageHDU(self.sky))
hdulist[4].header['BUNIT'] = 'Sky'
hdulist.writeto(outfile,overwrite=overwrite)
hdulist.close()
[docs] def tohdu(self):
"""
Convert the image to an HDU so it can be written to a file.
Note that only the image data is converted to the HDU
(no error, mask, flags or sky).
Returns
-------
hdu : fits HDU object
The FITS HDU object.
Example
-------
hdu = image.tohdu()
"""
# Data and header
if len(self.header)>0:
hdu = fits.PrimaryHDU(self.data,self.header)
else:
hdu = fits.PrimaryHDU(self.data)
hdu.header['IMAGTYPE'] = 'Prometheus'
return hdu
[docs] @classmethod
def read(cls,filename):
""" Read in an image from file."""
hdulist = fits.open(filename)
nhdu = len(hdulist)
# HDU0: Data and header
data = hdulist[0].data
head = hdulist[0].header
# HDU1: error
if nhdu>1:
error = hdulist[1].data
ehead = hdulist[1].header
else:
error = None
# HDU2: mask
if nhdu>2:
mask = hdulist[2].data
mhead = hdulist[2].header
else:
mask = None
# HDU3: flags
if nhdu>3:
flags = hdulist[3].data
fhead = hdulist[3].header
else:
flags = None
# HDU4: sky
if nhdu>4:
sky = hdulist[4].data
shead = hdulist[4].header
else:
sky = None
hdulist.close()
# Make WCS, this doesn't capture the PV#_# values
w = WCS(head)
# Units
unit = head.get('bunit')
if unit=='ADU':
unit = 'adu'
if unit is None:
unit = 'adu'
# make the ccddata object
image = CCDData(data,error,mask=mask,meta=head,flags=flags,sky=sky,wcs=w,unit=unit)
return image
[docs]class BoundingBox(BBox):
def __init__(self, *args, **kwargs):
# Initialize with the parent...
super().__init__(*args, **kwargs)
@property
def xrange(self):
return (self.ixmin,self.ixmax)
@property
def yrange(self):
return (self.iymin,self.iymax)
@property
def data(self):
return [(self.ixmin,self.ixmax),(self.iymin,self.iymax)]
def __getitem__(self,item):
return self.data[item]
def __array__(self):
return np.array(self.data)
@property
def slices(self):
""" Return the slices."""
return (slice(self.iymin,self.iymax,None),
slice(self.ixmin,self.ixmax,None))