Source code for CPAC.utils.extract_data

# Copyright (C) 2012-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import glob
import logging
import os
from pathlib import Path
import string
import sys
from typing import BinaryIO, Optional

import yaml

logger = logging.getLogger("extract_data_logs")
if logger.handlers:
    for handler in logger.handlers:
        logger.removeHandler(handler)
logging.basicConfig(
    filename=os.path.join(os.getcwd(), "extract_data_logs.log"),
    filemode="w",
    level=logging.DEBUG,
    format="%(levelname)s %(asctime)s %(lineno)d %(message)s",
)


[docs] def extract_data(c, param_map): """ Generate a CPAC input subject list Python file. The method extracts anatomical and functional data for each site (if multiple site) and/or scan parameters for each site and put it into a data structure read by Python. Examples -------- subjects_list =[ { 'subject_id' : '0050386', 'unique_id' : 'session_1', 'anat': '/Users/home/data/NYU/0050386/session_1/anat_1/anat.nii.gz', 'rest':{ 'rest_1_rest' : '/Users/home/data/NYU/0050386/session_1/rest_1/rest.nii.gz', 'rest_2_rest' : '/Users/home/data/NYU/0050386/session_1/rest_2/rest.nii.gz', } 'scan_parameters':{ 'tr': '2', 'acquisition': 'alt+z2', 'reference': '17', 'first_tr': '', 'last_tr': '', } }, ] or subjects_list =[ { 'subject_id' : '0050386', 'unique_id' : 'session_1', 'anat': '/Users/home/data/NYU/0050386/session_1/anat_1/anat.nii.gz', 'rest':{ 'rest_1_rest' : '/Users/home/data/NYU/0050386/session_1/rest_1/rest.nii.gz', 'rest_2_rest' : '/Users/home/data/NYU/0050386/session_1/rest_2/rest.nii.gz', } }, ] """ def get_list(arg) -> list: """Read each line of the file into list.""" if isinstance(arg, list): ret_list = arg else: ret_list = [fline.rstrip("\r\n") for fline in open(arg, "r").readlines()] return ret_list exclusion_list = [] if c.exclusionSubjectList is not None: exclusion_list = get_list(c.exclusionSubjectList) subject_list = [] if c.subjectList is not None: subject_list = get_list(c.subjectList) def checkTemplate(template) -> None: """Check if `template` is correct.""" if template.count("%s") != 2: msg = ( "Please provide '%s' in the template" "where your site and subjects are present" "Please see examples" ) logger.exception(msg) raise Exception(msg) filename, ext = os.path.splitext(os.path.basename(template)) ext = os.path.splitext(filename)[1] + ext if ext not in [".nii", ".nii.gz"]: msg = "Invalid file name", os.path.basename(template) logger.exception(msg) raise Exception(msg) def get_site_list(path): base, relative = path.split("%s") return os.listdir(base) def check_length(scan_name, file_name): if len(file_name) > 30: msg = ( "filename- %s is too long." "It should not be more than 30 characters." % (file_name) ) logger.exception(msg) raise Exception(msg) if ( len(scan_name) - len(os.path.splitext(os.path.splitext(file_name)[0])[0]) >= 40 ): msg = ( "scan name %s is too long." "It should not be more than 20 characters" % ( scan_name.replace( "_" + os.path.splitext(os.path.splitext(file_name)[0])[0], "" ) ) ) logger.exception(msg) raise Exception(msg) def create_site_subject_mapping(base, relative): """Create mapping between site and subject.""" site_subject_map = {} base_path_list = [] if c.siteList is not None: site_list = get_list(c.siteList) else: site_list = get_site_list(base) for site in site_list: paths = glob.glob(string.replace(base, "%s", site)) base_path_list.extend(paths) for path in paths: for sub in os.listdir(path): # check if subject is present in subject_list if subject_list: if sub in subject_list and sub not in exclusion_list: site_subject_map[sub] = site elif sub not in exclusion_list: if sub not in ".DS_Store": site_subject_map[sub] = site return base_path_list, site_subject_map def getPath(template): """Split the input template path... ...into base, path before subject directory and relative, path after subject directory. """ checkTemplate(template) base, relative = template.rsplit("%s", 1) base, subject_map = create_site_subject_mapping(base, relative) base.sort() relative = relative.lstrip("/") return base, relative, subject_map # get anatomical base path and anatomical relative path anat_base, anat_relative = getPath(c.anatomicalTemplate)[:2] # get functional base path, functional relative path and site-subject map func_base, func_relative, subject_map = getPath(c.functionalTemplate) if not anat_base: msg = ( "Anatomical Data template incorrect. No such file or directory %s", anat_base, ) logger.exception(msg) raise Exception(msg) if not func_base: msg = "Functional Data template incorrect. No such file or directory %s, func_base" logger.exception(msg) raise Exception(msg) if len(anat_base) != len(func_base): msg1 = ( "Some sites are missing, Please check your template", anat_base, "!=", func_base, ) logger.exception(msg1) msg2 = ( " Base length Unequal. Some sites are missing." "extract_data doesn't script support this.Please" "Provide your own subjects_list file" ) logger.exception(msg2) raise Exception(msg2) # calculate the length of relative paths(path after subject directory) func_relative_len = len(func_relative.split("/")) anat_relative_len = len(anat_relative.split("/")) def check_for_sessions(relative_path, path_length): """Check if there are sessions present.""" # default session_present = False session_path = "session_1" # session present if path_length is equal to 3 if path_length == 3: # noqa: PLR2004 relative_path_list = relative_path.split("/") session_path = relative_path_list[0] relative_path = string.join(relative_path_list[1:], "/") session_present = True elif path_length > 3: # noqa: PLR2004 msg = ( "extract_data script currently doesn't support this directory structure." "Please provide the subjects_list file to run CPAC." "For more information refer to manual" ) logger.exception(msg) raise Exception(msg) return session_present, session_path, relative_path func_session_present, func_session_path, func_relative = check_for_sessions( func_relative, func_relative_len ) anat_session_present, anat_session_path, anat_relative = check_for_sessions( anat_relative, anat_relative_len ) f = open( os.path.join( c.outputSubjectListLocation, "CPAC_subject_list_%s.yml" % c.subjectListName ), "wb", ) def fetch_path(i, anat_sub, func_sub, session_id): """ Extract anatomical and functional path for a session and print to file. Parameters ---------- i : int index of site anat_sub : string string containing subject/ concatenated subject-session path for anatomical file func_sub : string string containing subject/ concatenated subject-session path for functional file session_id : string session Raises ------ Exception """ try: def print_begin_of_file(sub, session_id): print("-", file=f) print(" subject_id: '" + sub + "'", file=f) print(" unique_id: '" + session_id + "'", file=f) def print_end_of_file(sub): if param_map is not None: try: logger.debug("site for sub %s -> %s", sub, subject_map.get(sub)) logger.debug( "scan parameters for the above site %s", param_map.get(subject_map.get(sub)), ) print(" scan_parameters:", file=f) print( " tr: '" + param_map.get(subject_map.get(sub))[4] + "'", file=f, ) print( " acquisition: '" + param_map.get(subject_map.get(sub))[0] + "'", file=f, ) print( " reference: '" + param_map.get(subject_map.get(sub))[3] + "'", file=f, ) print( " first_tr: '" + param_map.get(subject_map.get(sub))[1] + "'", file=f, ) print( " last_tr: '" + param_map.get(subject_map.get(sub))[2] + "'", file=f, ) except: msg = ( " No Parameter values for the %s site is defined in the scan" " parameters csv file" % subject_map.get(sub) ) raise ValueError(msg) # get anatomical file anat_base_path = os.path.join(anat_base[i], anat_sub) func_base_path = os.path.join(func_base[i], func_sub) anat = None func = None anat = glob.glob(os.path.join(anat_base_path, anat_relative)) func = glob.glob(os.path.join(func_base_path, func_relative)) if anat and func: print_begin_of_file(anat_sub.split("/")[0], session_id) print(" anat: '" + os.path.realpath(anat[0]) + "'", file=f) print(" rest: ", file=f) # iterate for each rest session for _iter in func: # get scan_id iterable = os.path.splitext( os.path.splitext(_iter.replace(func_base_path, "").lstrip("/"))[ 0 ] )[0] iterable = iterable.replace("/", "_") check_length(iterable, os.path.basename(os.path.realpath(_iter))) print( " " + iterable + ": '" + os.path.realpath(_iter) + "'", file=f, ) print_end_of_file(anat_sub.split("/")[0]) else: logger.debug("skipping subject %s", anat_sub.split("/")[0]) except ValueError: logger.exception(ValueError.message) raise except Exception as e: err_msg = ( "Exception while felching anatomical and functional " "paths: \n" + str(e) ) logger.exception(err_msg) raise Exception(err_msg) def walk(index, sub): """ Walk across each subject path in the data site path. Parameters ---------- index : int index of site sub : string subject_id Raises ------ Exception """ try: if func_session_present: # if there are sessions if "*" in func_session_path: session_list = glob.glob( os.path.join( func_base[index], os.path.join(sub, func_session_path) ) ) else: session_list = [func_session_path] if session_list: for session in session_list: session_id = os.path.basename(session) if anat_session_present: if func_session_path == anat_session_path: fetch_path( index, os.path.join(sub, session_id), os.path.join(sub, session_id), session_id, ) else: fetch_path( index, os.path.join(sub, anat_session_path), os.path.join(sub, session_id), session_id, ) else: fetch_path( index, sub, os.path.join(sub, session_id), session_id ) else: logger.debug("Skipping subject %s", sub) else: logger.debug("No sessions") session_id = "" fetch_path(index, sub, sub, session_id) except Exception: logger.exception(Exception.message) raise except: err_msg = "Please make sessions are consistent across all subjects.\n\n" logger.exception(err_msg) raise Exception(err_msg) try: for i in range(len(anat_base)): for sub in os.listdir(anat_base[i]): # check if subject is present in subject_list if subject_list: if sub in subject_list and sub not in exclusion_list: logger.debug("extracting data for subject: %s", sub) walk(i, sub) # check that subject is not in exclusion list elif sub not in exclusion_list and sub not in ".DS_Store": logger.debug("extracting data for subject: %s", sub) walk(i, sub) _name = os.path.join(c.outputSubjectListLocation, "CPAC_subject_list.yml") logger.info( "Extraction Successfully Completed...Input Subjects_list for CPAC - %s", _name, ) except Exception: logger.exception(Exception.message) raise finally: f.close()
[docs] def generate_supplementary_files(data_config_outdir, data_config_name): """Generate phenotypic template file and subject list for group analysis.""" import csv import os data_config_path = os.path.join(data_config_outdir, data_config_name) try: subjects_list = yaml.safe_load(open(data_config_path, "r")) except: f"\n\n[!] Data configuration file couldn't be read!\nFile path: {data_config_path}\n" subject_scan_set = set() subID_set = set() session_set = set() subject_set = set() scan_set = set() data_list = [] try: for sub in subjects_list: if sub["unique_id"]: subject_id = sub["subject_id"] + "_" + sub["unique_id"] else: subject_id = sub["subject_id"] try: for scan in sub["func"]: subject_scan_set.add((subject_id, scan)) subID_set.add(sub["subject_id"]) session_set.add(sub["unique_id"]) subject_set.add(subject_id) scan_set.add(scan) except KeyError: try: for scan in sub["rest"]: subject_scan_set.add((subject_id, scan)) subID_set.add(sub["subject_id"]) session_set.add(sub["unique_id"]) subject_set.add(subject_id) scan_set.add(scan) except KeyError: # one of the participants in the subject list has no # functional scans subID_set.add(sub["subject_id"]) session_set.add(sub["unique_id"]) subject_set.add(subject_id) except TypeError: err_str = ( "Subject list could not be populated!\nThis is most likely due to a" " mis-formatting in your inclusion and/or exclusion subjects txt file or" " your anatomical and/or functional path templates.\nCheck formatting of" " your anatomical/functional path templates and inclusion/exclusion" " subjects text files" ) raise TypeError(err_str) for item in subject_scan_set: list1 = [] list1.append(item[0] + "/" + item[1]) for val in subject_set: if val in item: list1.append(1) else: list1.append(0) for val in scan_set: if val in item: list1.append(1) else: list1.append(0) data_list.append(list1) # generate the phenotypic file templates for group analysis file_name = os.path.join( data_config_outdir, "phenotypic_template_%s.csv" % data_config_name ) f = _sassy_try_open_wb(file_name) writer = csv.writer(f) writer.writerow(["participant", "EV1", ".."]) for sub in sorted(subID_set): writer.writerow([sub, ""]) f.close() logger.info("Template Phenotypic file for group analysis - %s", file_name) """ # generate the phenotypic file templates for repeated measures if (len(session_set) > 1) and (len(scan_set) > 1): file_name = os.path.join(data_config_outdir, 'phenotypic_template_repeated' \ '_measures_mult_sessions_and_scans_%s.csv' \ % data_config_name) f = _sassy_try_open_wb(file_name) writer = csv.writer(f) writer.writerow(['participant', 'session', 'series', 'EV1', '..']) for session in sorted(session_set): for scan in sorted(scan_set): for sub in sorted(subID_set): writer.writerow([sub, session, scan, '']) f.close() logger.info( "Template Phenotypic file for group analysis with repeated " "measures (multiple sessions and scans) - %s", file_name ) if (len(session_set) > 1): file_name = os.path.join(data_config_outdir, 'phenotypic_template_repeated' \ '_measures_multiple_sessions_%s.csv' % data_config_name) f = _sassy_try_open_wb(file_name) writer = csv.writer(f) writer.writerow(['participant', 'session', 'EV1', '..']) for session in sorted(session_set): for sub in sorted(subID_set): writer.writerow([sub, session, '']) f.close() logger.info( "Template Phenotypic file for group analysis with repeated " "measures (multiple sessions) - %s", file_name ) if (len(scan_set) > 1): file_name = os.path.join(data_config_outdir, 'phenotypic_template_repeated' \ '_measures_multiple_scans_%s.csv' % data_config_name) f = _sassy_try_open_wb(file_name) writer = csv.writer(f) writer.writerow(['participant', 'series', 'EV1', '..']) for scan in sorted(scan_set): for sub in sorted(subID_set): writer.writerow([sub, scan, '']) f.close() logger.info("Template Phenotypic file for group analysis with repeated " "measures (multiple scans) - %s", file_name ) """ # generate the group analysis subject lists file_name = os.path.join( data_config_outdir, "participant_list_group_analysis_%s.txt" % data_config_name ) try: with open(file_name, "w") as f: for sub in sorted(subID_set): print(sub, file=f) except: _sassy_oserror(file_name) logger.info( "Participant list required later for group analysis - %s\n\n", file_name )
[docs] def read_csv(csv_input): """Read CSV file. 'Acquisition' 'Reference' 'Site' 'TR (seconds)' """ from collections import defaultdict import csv try: reader = csv.DictReader(open(csv_input, "U")) dict_labels = defaultdict(list) for line in reader: csv_dict = {k.lower(): v for k, v in line.items()} dict_labels[csv_dict.get("site")] = [ csv_dict[key] for key in sorted(csv_dict.keys()) if key not in ("site", "scan") ] if len(dict_labels) < 1: msg = "Scan Parameters File is either empty or missing header" logger.exception(msg) raise Exception(msg) return dict_labels except IOError: msg = "Error reading the csv file %s", csv_input logger.exception(msg) raise Exception(msg) except: msg = "Error reading scan parameters csv. Make sure you are using the correct template" logger.exception(msg) raise Exception(msg)
def _sassy_oserror(file_name: str) -> None: """Raise a sassy OSError.""" msg = ( f"\n\nCPAC says: I couldn't save this file to your drive:\n {file_name}" "\n\nMake sure you have write access? Then come back. Don't worry.. I'll" " wait.\n\n" ) raise OSError(msg) def _sassy_try_open_wb(file_name: str) -> Optional[BinaryIO]: """Open a file in 'wb' mode or raise a sassy OSError if a file can't be saved.""" f = None try: f = open(file_name, "wb") except (OSError, TypeError): _sassy_oserror(file_name) return f
[docs] class Configuration(object): """Set dictionary keys as map attributes.""" def __init__(self, config_map): for key in config_map: if config_map[key] == "None": config_map[key] = None setattr(self, key, config_map[key])
[docs] def run(data_config: Path | str) -> None: """Run a data config. Parameters ---------- data_config : ~pathlib.Path or str path to data_config file """ logger.info( "For any errors or messages check the log file - %s", os.path.join(os.getcwd(), "extract_data_logs.log"), ) c = Configuration(yaml.safe_load(open(os.path.realpath(data_config), "r"))) if c.scanParametersCSV is not None: read_csv(c.scanParametersCSV) else: logger.debug( "no scan parameters csv included\n" "make sure you turn off slice timing correction option\n" "in CPAC configuration\n" ) generate_supplementary_files(c.outputSubjectListLocation, c.subjectListName)
if __name__ == "__main__": if len(sys.argv) != 2: # noqa: PLR2004 print("Usage: python extract_data.py data_config.yml") # noqa T201 sys.exit() else: run(sys.argv[1])