Source code for CPAC.utils.test_init

# CPAC/utils/test_init.py

# Copyright (C) 2015-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
#
# Contributing authors (please append):
# Daniel Clark
# Jon Clucas
"""Assist in initializing CPAC tests resources."""

from typing import Optional

from nipype.interfaces.utility import IdentityInterface

from CPAC.pipeline.nipype_pipeline_engine import Node
from CPAC.utils.monitoring import UTLOGGER


[docs] def create_dummy_node(name: str, fields: Optional[list[str]] = None): """ Create a dummy IdentityInterface source for upstream resources. Parameters ---------- name : str a name for the dummy Node fields : list of str, optional a list of resources to be present in the created Node. If not provided, the only resource will be called 'resource' Returns ------- Node """ if fields is None: fields = ["resource"] return Node(IdentityInterface(fields=fields), name=name)
# Return tests data config file
[docs] def populate_template_config(config_type: str) -> str: """ Populate a template config file from CPAC_RESOURCE_DIR with actual filepaths. Parameters ---------- config_type : string config file to populate; accepts 'data_config' and 'pipeline_config' Returns ------- config_test : string filepath to the newly written config file for testing """ # Import packages import os # Init variables resource_dir = return_resource_dir() templates_dir = return_resource_subfolder("templates") yamls = ["data_config", "pipeline_config"] # Check config type and build path if config_type in yamls: ext = ".yml" out_name = "configs" else: # Check if it's supported, otherwise raise an Exception err_msg = "config_type parameter: %s is unsupported" % config_type raise Exception(err_msg) # Get template and output paths template_path = os.path.join(templates_dir, config_type + ext) output_dir = return_resource_subfolder(out_name) output_path = os.path.join(output_dir, config_type + ext) # Open the files tmp_f = open(template_path, "r") out_f = open(output_path, "w") # Replace 'RESOURCE_DIR' string with actual directory for line in tmp_f: out_f.write(line.replace("RESOURCE_DIR", resource_dir)) # Close file objects tmp_f.close() out_f.close() # Return filepath return output_path
# Populate all of the template paths
[docs] def populate_all_templates(): """ Populate all of the template files. Parameters ---------- None Returns ------- None """ # Import packages # Init variables outputs = [] config_types = [ "data_config", "pipeline_config", "centrality_spec", "map_spec", "mask_spec", "roi_spec", "seed_spec", "spatial_maps_spec", ] # Populate all of the config templates with actual paths for config_type in config_types: output = populate_template_config(config_type) outputs.append(output) # Check that they all returned a value if len(outputs) == len(config_types): UTLOGGER.info("Successfully populated and saved templates!") else: err_msg = "Something went wrong during template population" raise Exception(err_msg)
# Get the AWS credentials
[docs] def return_aws_creds(): """ Return the AWS credentials file given by the CPAC_AWS_CREDS environment variable. Parameters ---------- None Returns ------- aws_creds : string filepath to the AWS credentials with access key id and secret access key """ # Import packages import os # Init variables creds_path = os.getenv("CPAC_AWS_CREDS") # Check if set if not creds_path: UTLOGGER.error( "CPAC_AWS_CREDS environment variable not set!\n" "Set this to the filepath location of your AWS credentials." ) creds_path = input("Enter path to AWS credentials file: ") return creds_path
# Get the default test bucket name
[docs] def default_bucket_name(): """ Return the default S3 bucket name used in test suite. Parameters ---------- None Returns ------- bucket_name : string default S3 bucket name for testing """ # Set default bucket name return "fcp-indi"
# Return bucket name # Grab all nifti files within directory
[docs] def return_all_niis(base_dir): """ Walk through a base directory and all subsequent files. Return the filepaths of all nifti files found. Parameters ---------- base_dir : string filepath to the base directory to search through Returns ------- nii_list : list a list of filepath strings of the nifti files found in base_dir """ # Import packages import os # Init variables nii_list = [] # Collect computed outputs for root, dirs, files in os.walk(base_dir): if files: nii_list.extend( [os.path.join(root, file) for file in files if file.endswith(".nii.gz")] ) # Return the list of files return nii_list
# Download the CPAC resource dir from S3
[docs] def download_cpac_resources_from_s3(local_base): """ Download the CPAC testing resources directory from S3. Parameters ---------- local_base : string the local directory to save the 'cpac_resources' contents """ # Import packages import os from indi_aws import aws_utils, fetch_creds # Init variables bucket_name = default_bucket_name() resource_folder = "cpac_resources" s3_prefix = os.path.join("data/test_resources", resource_folder) # Get bucket object bucket = fetch_creds.return_bucket(None, bucket_name) # Gather files from bucket for obj in bucket.objects.filter(Prefix=s3_prefix): bkey = obj.key # If the object is just a folder, move on to next object if bkey.endswith("/"): continue # Form local path from key local_path = os.path.join( local_base, bkey.split(resource_folder)[-1].lstrip("/") ) # Make download directories local_dir = os.path.dirname(local_path) if not os.path.exists(local_dir): os.makedirs(local_dir) # Download file if it doesn't exist if not os.path.exists(local_path): bucket.download_file( bkey, local_path, Callback=aws_utils.ProgressPercentage(obj) ) # Print done UTLOGGER.info("CPAC resources folder in %s is complete!", local_base)
# Look for CPAC_RESOURCE_DIR to be in environment
[docs] def return_resource_dir(): """ Return the filepath of the CPAC_RESOURCE_DIR. Note the CPAC_RESOURCE_DIR environment variable must be set. Parameters ---------- None Returns ------- resource_dir : string the file path on disk where the cpac resources folder is """ # Import packages import os # Init variables resource_dir = os.getenv("CPAC_RESOURCE_DIR") # Check if set if not resource_dir: # Print notification of cpac resources directory UTLOGGER.error( "CPAC_RESOURCE_DIR environment variable not set! Enter directory of the" " cpac_resources folder.\n\n*If the folder does not exist, it will be" " downloaded under the directory specified." ) # Get user input resource_dir = input("Enter C-PAC resources directory: ") # Check and download any new or missing resources from S3 copy try: download_cpac_resources_from_s3(resource_dir) except Exception as exc: err_msg = ( "There was a problem downloading the cpac_resources " "folder from S3.\nError: %s" % exc ) raise Exception(err_msg) return resource_dir
# Return any subfolder of the resource directory
[docs] def return_resource_subfolder(subfolder): """ Funnction to return subfolders of the CPAC_RESOURCE_DIR. Parameters ---------- subfolder : string subfolder name to return path of Returns ------- resource_subfolder : string filepath to the resource subfolder """ # Import packages import os # Init variables resource_dir = return_resource_dir() in_settings = ["configs", "creds", "resources", "subject_lists", "templates"] # Check if its a sub-subfolder if subfolder in in_settings: resource_subfolder = os.path.join(resource_dir, "settings", subfolder) else: resource_subfolder = os.path.join(resource_dir, subfolder) # Return subfolder return resource_subfolder
# Return test strategies obj file
[docs] def return_strats_obj(): """ Return the file path of the strategies obj file from the CPAC_RESOURCE_DIR. Parameters ---------- None Returns ------- strats_obj : string filepath to the strategies obj file """ # Import packages import os # Init variables settings_dir = return_resource_subfolder("resources") # Get strategies obj return os.path.join(settings_dir, "strategies_test.obj")
# Return filepath # Return tests subject list
[docs] def return_subject_list(): """ Return the file path of the subject list file from the CPAC_RESOURCE_DIR. Parameters ---------- None Returns ------- subject_list : string filepath to the subject list yaml file """ # Import packages import os # Init variables config_dir = return_resource_subfolder("subject_lists") # Get sublist return os.path.join(config_dir, "CPAC_subject_list_test.yml")
# Return filepath # Return the test subjects measure directories
[docs] def return_subj_measure_dirs(measure): """ Grab the base directories of subject's output files for a given measure or workflow. Parameters ---------- measure : string the measure or workflow or derivative of interest to parse for; this must be the folder name where all of the subject's test outputs are located (e.g. 'network_centrality') Returns ------- subj_measure_dirs : list a list of strings of the base directories for each instance of the desired measure folder within the test subjects outputs """ # Import packages import glob import os # Init variables test_subj = return_test_subj() outputs_dir = return_resource_subfolder("output") # Root directories (cpac_resources/output/reg/subj_sess/scan/measure/..) subj_measure_dirs = glob.glob( os.path.join(outputs_dir, "*", "%s*" % test_subj, "*", measure) ) # Check to see if the directories exist if len(subj_measure_dirs) == 0: err_msg = "Unable to find any subject directories for the %s measure." % measure raise Exception(err_msg) # Return base directories for test measures outputs return subj_measure_dirs
# Get subject for individual tests
[docs] def return_test_subj(): """ Return the subject id. Note the CPAC_RESOURCE_DIR environment variable must be set. Parameters ---------- None Returns ------- resource_dir : string the file path on disk where the cpac resources folder is """ # Import packages import os # Init variables test_subj = os.getenv("CPAC_TEST_SUBJ") # Get cpac resource directory and get a list of subject folders input_dir = return_resource_subfolder("input") site_dir = os.path.join(input_dir, "site_1") # Get list of subject directories subs = os.listdir(site_dir) # Check if set and exists if not test_subj: UTLOGGER.error("CPAC_TEST_SUBJ environment variable not set!") # Get user input test_subj = input("Enter C-PAC benchmark test subject id: ") # Check to make sure their input files exist if test_subj not in subs: err_msg = ( "Test subject %s is not in the cpac_resources subject " "directory %s. Please specify different CPAC_TEST_SUBJ." % (test_subj, site_dir) ) raise Exception(err_msg) return test_subj
# Smooth nifti file
[docs] def smooth_nii_file(self, nii_file, fwhm, mask_file=None): """ Gaussian smooth nifti files and optionally use a mask on the smoothed data. Parameters ---------- nii_file : string filepath to the nifti file to smooth fwhm : float FWHM for Gaussian smoothing kernel, in mm mask_file : string (optional); default=None filepath to the mask file to use Returns ------- smooth_arr : numpy.ndarray smoothed nifti image as a numpy array """ # Import packages import numpy as np import nibabel as nib import scipy.ndimage # Init variables raw_nii = nib.load(nii_file) raw_arr = raw_nii.get_fdata() # Check parameters if mask_file: mask_arr = nib.load(mask_file).get_fdata() # Check the mask shape matches the raw nifti if mask_arr.shape != raw_arr.shape: err_msg = ( "Mask file has different dimensions than nifti.\n" "Check the paths are correct and try again." ) raise Exception(err_msg) # Calculate sigma for smoothing mm_res = np.abs(raw_nii.affine[0][0]) sigma = fwhm / 2.3548 / mm_res # Smooth input smooth_arr = scipy.ndimage.gaussian_filter(raw_arr, sigma, order=0) # And mask if using one (this writes it to a 1d array) if mask_arr: smooth_out = smooth_arr[mask_arr.astype("bool")] smooth_arr = np.zeros(mask_arr.shape, dtype=float) # Get mask coordinates and populate smoothed image coords = np.argwhere(mask_arr) for idx, xyz in enumerate(coords): x, y, z = xyz smooth_arr[x, y, z] = smooth_out[idx] # Return the smoothed array return smooth_arr
[docs] def download_resource_from_s3(s3_url_path): """Download test resource from S3 bucket.""" # Import packages import os import tempfile import urllib.error import urllib.parse import urllib.request # Init variables temp_dir = tempfile.mkdtemp() url_open = urllib.request.URLopener() base_name = os.path.basename(s3_url_path) dl_path = os.path.join(temp_dir, base_name) # Download file url_open.retrieve(s3_url_path, dl_path) # Return the downloaded path return dl_path
# Setup log file
[docs] def setup_test_logger(logger_name, log_file, level, to_screen=False): """ Initialize and configure a logger that can write to file and (optionally) the screen. Parameters ---------- logger_name : string name of the logger log_file : string file path to the log file on disk level : integer indicates the level at which the logger should log; this is controlled by integers that come with the python logging package. (e.g. logging.INFO=20, logging.DEBUG=10) to_screen : boolean (optional) flag to indicate whether to enable logging to the screen Returns ------- logger : logging.Logger object Python logging.Logger object which is capable of logging run- time information about the program to file and/or screen """ # Import packages import logging from CPAC.utils.monitoring.custom_logging import getLogger # Init logger, formatter, filehandler, streamhandler logger = getLogger(logger_name) logger.setLevel(level) formatter = logging.Formatter("%(asctime)s : %(message)s") # Write logs to file file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Write to screen, if desired if to_screen: stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) # Return the logger return logger
[docs] def pearson_correlation(nii_1, nii_2): import numpy as np import nibabel as nib data_1 = nib.load(nii_1).get_fdata() data_2 = nib.load(nii_2).get_fdata() R = np.corrcoef(data_1.flatten(), data_2.flatten()) return R[0, 1]
# Calculate concordance correlation coefficient
[docs] def concordance(x, y): """ Return the concordance correlation coefficient as defined by Lin (1989). Parameters ---------- x : list or array a list of array of length N of numbers y : list or array a list of array of length N of numbers Returns ------- rho_c : numpy.float32 the concordance value as a float """ # Import packages import numpy as np # Usage errors check x_shape = np.shape(x) y_shape = np.shape(y) if len(x_shape) != 1 or len(y_shape) != 1: err_msg = "Inputs must be 1D lists or arrays." raise ValueError(err_msg) if x_shape != y_shape: err_msg = ( "Length of the two inputs must be equal.\n" "Length of x: %d\nLength of y: %d" % (len(x), len(y)) ) raise ValueError(err_msg) # Init variables x_arr = np.array(x).astype("float64") y_arr = np.array(y).astype("float64") # Get pearson correlation rho = np.corrcoef(x_arr, y_arr)[0][1] # Get stdevs sigma_x = np.std(x_arr) sigma_y = np.std(y_arr) # Get means mu_x = np.mean(x_arr) mu_y = np.mean(y_arr) # Comput condordance return (2 * rho * sigma_x * sigma_y) / ( sigma_x**2 + sigma_y**2 + (mu_x - mu_y) ** 2 )
# Return variables