Source code for pydol.pipeline.jwst

from jwst.pipeline import Detector1Pipeline, Image2Pipeline, Image3Pipeline
import os
import multiprocessing as mp
from pathlib import Path
from crds import client


[docs] class jpipe: def __init__( self, input_files, out_dir=".", filter="", crds_context="jwst_1241.pmap", crds_dir=".", n_cores=None, **kwargs, ): # ---------------- Configuration ---------------- # self.config = { "corr_1byf": False, "corr_snowball": True, "fit_by_channel": False, "background_method": "median", } self.config.update(kwargs) self.filter = filter # Core handling (STScI recommends controlled usage) available_cores = mp.cpu_count() if n_cores is None: self.n_cores = available_cores else: self.n_cores = min(n_cores, available_cores) # ---------------- CRDS Setup ---------------- # crds_dir = Path(crds_dir) crds_dir.mkdir(parents=True, exist_ok=True) os.environ["CRDS_PATH"] = str(crds_dir) os.environ["CRDS_SERVER_URL"] = "https://jwst-crds.stsci.edu" os.environ["CRDS_CONTEXT"] = crds_context client.set_crds_server("https://jwst-crds.stsci.edu") # ---------------- Input Files ---------------- # if not input_files: raise ValueError("Input files list CANNOT be empty!") self.input_files = input_files # ---------------- Output Directories ---------------- # self.out_dir = Path(out_dir) (self.out_dir / "stage1").mkdir(parents=True, exist_ok=True) (self.out_dir / "stage2").mkdir(parents=True, exist_ok=True) (self.out_dir / "stage3").mkdir(parents=True, exist_ok=True) # ========================================================== # STAGE 1 # ==========================================================
[docs] def stage1_pipeline(self, filename): # STScI-supported internal parallelization steps_stage1 = { "jump": { "expand_large_events": self.config["corr_snowball"], "maximum_cores": str(self.n_cores), }, "ramp_fit": { "maximum_cores": str(self.n_cores), }, "clean_flicker_noise": { "skip": not self.config["corr_1byf"], "fit_by_channel": self.config["fit_by_channel"], "background_method": self.config["background_method"], }, } Detector1Pipeline.call( filename, output_dir=str(self.out_dir / "stage1"), save_results=True, steps=steps_stage1, )
# ========================================================== # STAGE 2 # ==========================================================
[docs] def stage2_pipeline(self, filename): # Sequential execution (STScI recommendation) Image2Pipeline.call( filename, output_dir=str(self.out_dir / "stage2"), save_results=True, )
# ========================================================== # STAGE 3 # ==========================================================
[docs] def stage3_pipeline(self, filenames): # Stage 3 is memory heavy — run once per association Image3Pipeline.call( filenames, output_file=self.filter, output_dir=str(self.out_dir / "stage3"), save_results=True, )
# ========================================================== # MASTER CALL # ========================================================== def __call__(self): # ---------------- Stage 1 ---------------- # uncal_files = [f for f in self.input_files if "uncal" in f] rate_files = [] for f in uncal_files: rate_f = f.replace("stage0", "stage1").replace("uncal", "rate") rate_files.append(rate_f) if not Path(rate_f).exists(): self.stage1_pipeline(f) # ---------------- Stage 2 ---------------- # rate_files_to_run = [ f for f in rate_files if not Path(f.replace("stage1", "stage2").replace("rate", "cal")).exists() ] # STScI recommends NOT using multiprocessing for Stage 2 for f in rate_files_to_run: self.stage2_pipeline(f) # ---------------- Stage 3 ---------------- # cal_files = [ f.replace("stage1", "stage2").replace("rate", "cal") for f in rate_files ] if cal_files: self.stage3_pipeline(cal_files)