Source code for pydol.photometry.acs

import os
from glob import glob
from astropy.table import Table
from astropy.wcs import WCS
from astropy.io import fits
import numpy as np
import multiprocessing as mp
from pathlib import Path
import subprocess
import pandas as pd
import astropy.units as u
from astropy.coordinates import angular_separation
from .scripts.catalog_filter import box

param_dir_default = str(Path(__file__).parent.joinpath('params'))
script_dir = str(Path(__file__).parent.joinpath('scripts'))

[docs] def acs_phot( flt_files, filter='f435w', output_dir='.', drz_path='.', cat_name='', param_file=None, sharp_cut=0.2, crowd_cut=2.25, obj_type=2, SNR_min=5, quality_filters = "f814w" ): """ HST ACS photometry using DOLPHOT (HPC-friendly version) Parameters ---------- flt_files : list List of ACS _flt.fits files filter : str output_dir : str drz_path : str Path (without .fits extension) to drz image cat_name : str param_file : str or None sharp_cut : float crowd_cut : float """ if len(flt_files) < 1: raise ValueError("flt_files cannot be EMPTY") os.makedirs(output_dir, exist_ok=True) with fits.open(f"{drz_path}.fits") as hdul: header = hdul[0].header if 'DOL_ACS' not in header: subprocess.run(["acsmask", f"{drz_path}.fits"], check=True) if param_file is None or not os.path.exists(param_file): print("Using default DOLPHOT params") edit_params = True param_file = os.path.join(param_dir_default, "acs_dolphot.param") else: edit_params = False out_id = filter + cat_name # ------------------------------------------------------- # Prepare exposures # ------------------------------------------------------- if edit_params: exps = [] for f in flt_files: out_dir = os.path.basename(f).split('.')[0] exp_dir = os.path.join(output_dir, out_dir) os.makedirs(exp_dir, exist_ok=True) data_fits = os.path.join(exp_dir, "data.fits") if not os.path.exists(data_fits): subprocess.run(["cp", f, data_fits], check=True) exps.append(exp_dir) print("Running ACSMASK, SPLITGROUPS, CALCSKY...") for exp_dir in exps: chip1 = os.path.join(exp_dir, "data.chip1.sky.fits") chip2 = os.path.join(exp_dir, "data.chip2.sky.fits") if not (os.path.exists(chip1) and os.path.exists(chip2)): subprocess.run(["acsmask", f"{exp_dir}/data.fits"], check=True) subprocess.run(["splitgroups", f"{exp_dir}/data.fits"], check=True) subprocess.run( ["calcsky", f"{exp_dir}/data.chip1", "15", "35", "4", "2.25", "2.00"], check=True ) subprocess.run( ["calcsky", f"{exp_dir}/data.chip2", "15", "35", "4", "2.25", "2.00"], check=True ) # ------------------------------------------------------- # Rewrite param file # ------------------------------------------------------- with open(param_file) as f: dat = f.readlines() dat[0] = f"Nimg = {2 * len(exps)}\n" dat[4] = f"img0_file = {drz_path}\n" dat[5] = "" for i, exp_dir in enumerate(exps): dat[5] += f"img{2*i+1}_file = {exp_dir}/data.chip1\n" dat[5] += f"img{2*i+2}_file = {exp_dir}/data.chip2\n" param_file = os.path.join(output_dir, f"acs_dolphot_{out_id}.param") with open(param_file, "w") as f: f.writelines(dat) # ------------------------------------------------------- # Run DOLPHOT # ------------------------------------------------------- out_fits = os.path.join(output_dir, f"{out_id}_photometry.fits") if not os.path.exists(out_fits): print("Running DOLPHOT...") process = subprocess.Popen( ["dolphot", f"{output_dir}/out", f"-p{param_file}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) for line in process.stdout: print(line, end="") process.wait() if process.returncode != 0: raise RuntimeError("DOLPHOT failed.") else: print(f"{out_fits} already exists.") # ------------------------------------------------------- # Convert to FITS table # ------------------------------------------------------- subprocess.run( ["python", f"{script_dir}/to_table.py", "--o", f"{out_id}_photometry", "--f", f"{output_dir}/out", "--d", "ACS"], check=True ) phot_table = Table.read(out_fits) # ------------------------------------------------------- # Add RA/Dec # ------------------------------------------------------- with fits.open(drz_path+'.fits') as hdu: wcs = WCS(hdu[0].header) ra, dec = wcs.pixel_to_world_values( phot_table["x"] - 0.5, phot_table["y"] - 0.5 ) phot_table["ra"] = ra phot_table["dec"] = dec # ------------------------------------------------------- # Quality filtering # ------------------------------------------------------- mask = np.ones(len(phot_table), dtype=bool) for filt in quality_filters.split("_"): filt = filt.upper() mask &= ( (phot_table[f"sharpness_{filt}"]**2 <= sharp_cut) & (phot_table[f"crowd_{filt}"] <= crowd_cut) & (phot_table[f"SNR_{filt}"] >= SNR_min) ) mask &= phot_table["type"] <= obj_type phot_table_filt = phot_table[mask].copy() phot_table.write(out_fits, overwrite=True) phot_table_filt.write(f"{output_dir}/{out_id}_photometry_filt.fits", overwrite=True) print("ACS Stellar Photometry Completed!")
[docs] def acs_phot_comp( param_file=None, m=[20], filter='f814w', region_name='3', output_dir='.', tab_path='.', ref_img_path=None, cat_name='', sharp_cut=0.2, crowd_cut=2.25, SNR_min=5, obj_type=2, ra_col='ra', dec_col='dec', ra=0, dec=0, shape='box', width=24/3600, height=24/3600, ang=245, nx=10, ny=10, quality_filters = 'f814w' ): """ ACS completeness analysis using DOLPHOT (HPC-optimized version) """ if param_file is None or not os.path.exists(param_file): raise ValueError("param_file cannot be EMPTY") os.makedirs(output_dir, exist_ok=True) out_id = filter + cat_name # ------------------------------------------------------- # Region selection # ------------------------------------------------------- tab = Table.read(tab_path) if shape == 'box': tab_n = box(tab, ra_col, dec_col, ra, dec, 0, 0, width, height, angle=ang) elif shape == 'circle': tab_n = tab.copy() tab_n['r'] = angular_separation( tab[ra_col]*u.deg, tab[dec_col]*u.deg, ra*u.deg, dec*u.deg ).to(u.deg).value tab_n = tab_n[tab_n['r'] <= width] x_cen = 0.5 * (tab_n['x'].min() + tab_n['x'].max()) y_cen = 0.5 * (tab_n['y'].min() + tab_n['y'].max()) r_pix_max = np.sqrt( (tab_n['x'] - x_cen)**2 + (tab_n['y'] - y_cen)**2 ).max() # ------------------------------------------------------- # Fake star grid # ------------------------------------------------------- xvals = tab_n['x'] yvals = tab_n['y'] xx, yy = np.meshgrid( np.linspace(xvals.min() + 10, xvals.max() - 10, nx), np.linspace(yvals.min() + 10, yvals.max() - 10, ny) ) x = xx.ravel().astype(int) y = yy.ravel().astype(int) if shape == 'circle': r_pix = np.sqrt((x - x_cen)**2 + (y - y_cen)**2) ind = r_pix <= r_pix_max x = x[ind] y = y[ind] ext = np.zeros_like(x) chip = np.ones_like(x) mags = np.array([np.full(x.shape, mag_) for mag_ in m]).T data = { 'ext': ext, 'chip': chip, 'x': x, 'y': y } for i in range(len(m)): data[f'mag_{i}'] = mags[:, i] mag_string = "_".join(map(str, m)) fake_file = os.path.join( output_dir, f"fake_{region_name}_{mag_string}_{out_id}.txt" ) pd.DataFrame(data).to_csv( fake_file, sep=' ', index=False, header=False ) # ------------------------------------------------------- # Modify param file # ------------------------------------------------------- with open(param_file) as f: dats = f.readlines() for n, line in enumerate(dats): if 'FakeStars' in line: break dats[n] = f"FakeStars = {fake_file}\n" dats[n+1] = ( f"FakeOut = " f"{output_dir}/fake_{region_name}_{mag_string}_{out_id}.fake\n" ) param_file_new = param_file.replace( '.param', f'_{region_name}.param' ) with open(param_file_new, 'w') as f: f.writelines(dats) # ------------------------------------------------------- # Run DOLPHOT fake stars # ------------------------------------------------------- phot_file = os.path.join(output_dir, f"{out_id}_photometry.fits") if not os.path.exists(phot_file): print(f"{phot_file} NOT FOUND!!") return print("Running DOLPHOT fake star test...") process = subprocess.Popen( ["dolphot", f"{output_dir}/out", f"-p{param_file_new}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) for line in process.stdout: print(line, end="") process.wait() if process.returncode != 0: raise RuntimeError("DOLPHOT fake star run failed.") # ------------------------------------------------------- # Convert fake output # ------------------------------------------------------- subprocess.run([ "python", f"{script_dir}/to_table_fake.py", "--f", f"{output_dir}/fake_{region_name}_{mag_string}_{out_id}.fake", "--d", "ACS", "--c", f"{output_dir}/out.columns", "--o", f"fake_out_{region_name}_{mag_string}_{out_id}" ], check=True) fake_out_fits = os.path.join( output_dir, f"fake_out_{region_name}_{mag_string}_{out_id}.fits" ) phot_table = Table.read(fake_out_fits) # ------------------------------------------------------- # Add RA/Dec if requested # ------------------------------------------------------- if ref_img_path is not None: with fits.open(ref_img_path) as hdu: wcs = WCS(hdu[0].header) ra, dec = wcs.pixel_to_world_values( phot_table["x"] - 0.5, phot_table["y"] - 0.5 ) phot_table["ra"] = ra phot_table["dec"] = dec # ------------------------------------------------------- # Filtering # ------------------------------------------------------- mask = np.ones(len(phot_table), dtype=bool) for filt in quality_filters.split("_"): filt = filt.upper() mask &= ( (phot_table[f"sharpness_{filt}"]**2 <= sharp_cut) & (phot_table[f"crowd_{filt}"] <= crowd_cut) & (phot_table[f"SNR_{filt}"] >= SNR_min) ) mask &= phot_table["type"] <= obj_type phot_table_filt = phot_table[mask] phot_table.write(fake_out_fits, overwrite=True) phot_table_filt.write(f"{output_dir}/fake_out_{region_name}_{mag_string}_{out_id}_filt.fits", overwrite=True) print("ACS Completeness Completed!")