# Copyright (C) 2016-2024  C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import re
import sys
from warnings import warn
from botocore.exceptions import BotoCoreError
import yaml
from CPAC.utils.monitoring import UTLOGGER
[docs]
def bids_decode_fname(file_path, dbg=False, raise_error=True):
    f_dict = {}
    fname = os.path.basename(file_path)
    # first lets make sure that we know how to handle the file
    if "nii" not in fname.lower() and "json" not in fname.lower():
        msg = f"File ({fname}) does not appear to be a nifti or json file"
        raise IOError(msg)
    if dbg:
        UTLOGGER.debug("parsing %s", file_path)
    # first figure out if there is a site directory level, this isn't
    # specified in BIDS currently, but hopefully will be in the future
    file_path_vals = os.path.dirname(file_path).split("/")
    sub = [s for s in file_path_vals if "sub-" in s]
    if dbg:
        UTLOGGER.debug("found subject %s in %s", sub, file_path_vals)
    if len(sub) > 1:
        UTLOGGER.debug(
            "Odd that there is more than one subject directory in (%s), does the"
            " filename conform to BIDS format?",
            file_path,
        )
    if sub:
        sub_ndx = file_path_vals.index(sub[0])
        if sub_ndx > 0 and file_path_vals[sub_ndx - 1]:
            if dbg:
                UTLOGGER.debug("setting site to %s", file_path_vals[sub_ndx - 1])
            f_dict["site"] = file_path_vals[sub_ndx - 1]
        else:
            f_dict["site"] = "none"
    elif file_path_vals[-1]:
        if dbg:
            UTLOGGER.debug(
                "looking for subject id didn't pan out settling for last subdir %s",
                file_path_vals[-1],
            )
        f_dict["site"] = file_path_vals[-1]
    else:
        f_dict["site"] = "none"
    f_dict["site"] = re.sub(r"[\s\-\_]+", "", f_dict["site"])
    fname = fname.split(".")[0]
    # convert the filename string into a dictionary to pull out the other
    # key value pairs
    for key_val_pair in fname.split("_"):
        # if the chunk has the shape key-val store key: val in f_dict
        if "-" in key_val_pair:
            chunks = key_val_pair.split("-")
            f_dict[chunks[0]] = "-".join(chunks[1:])
        else:
            f_dict["scantype"] = key_val_pair.split(".")[0]
    if "scantype" not in f_dict:
        msg = (
            f"Filename ({fname}) does not appear to contain"
            " scan type, does it conform to the BIDS format?"
        )
        if raise_error:
            raise ValueError(msg)
        else:
            UTLOGGER.error(msg)
    elif not f_dict["scantype"]:
        msg = (
            f"Filename ({fname}) does not appear to contain"
            " scan type, does it conform to the BIDS format?"
        )
        if raise_error:
            raise ValueError(msg)
        else:
            UTLOGGER.error(msg)
    elif "bold" in f_dict["scantype"] and not f_dict["task"]:
        msg = (
            f"Filename ({fname}) is a BOLD file, but doesn't contain a task, does"
            " it conform to the BIDS format?"
        )
        if raise_error:
            raise ValueError(msg)
        else:
            UTLOGGER.error(msg)
    return f_dict 
[docs]
def bids_entities_from_filename(filename):
    """Function to collect a list of BIDS entities from a given
    filename.
    Parameters
    ----------
    filename : str
    Returns
    -------
    entities : list
    Examples
    --------
    >>> bids_entities_from_filename(
    ...     's3://fake/data/sub-0001/ses-NFB3/func/'
    ...     'sub-0001_ses-NFB3_task-MSIT_bold.nii.gz')
    ['sub-0001', 'ses-NFB3', 'task-MSIT', 'bold']
    """
    return (
        (filename.split("/")[-1] if "/" in filename else filename)
        .split(".")[0]
        .split("_")
    ) 
[docs]
def bids_match_entities(file_list, entities, suffix):
    """Function to subset a list of filepaths by a passed BIDS entity.
    Parameters
    ----------
    file_list : list of str
    entities : str
        BIDS entities joined by underscores (e.g., 'ses-001_task-PEER1')
    suffix : str
        BIDS suffix (e.g., 'bold', 'T1w')
    Returns
    -------
    list of str
    Examples
    --------
    >>> bids_match_entities([
    ...     's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
    ... ], 'task-PEER1', 'bold')
    ['s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz']
    >>> bids_match_entities([
    ...     's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
    ... ], 'PEER', 'bold')
    Traceback (most recent call last):
    LookupError: No match found for provided entity "PEER" in
    - s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz
    - s3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz
    Perhaps you meant one of these?
    - task-PEER1
    - task-PEER2
    """
    matches = [
        file
        for file in file_list
        if (
            f"_{entities}_" in "_".join(bids_entities_from_filename(file))
            and bids_entities_from_filename(file)[-1] == suffix
        )
        or bids_entities_from_filename(file)[-1] != suffix
    ]
    if file_list and not matches:
        pp_file_list = "\n".join([f"- {file}" for file in file_list])
        error_message = " ".join(
            [
                "No match found for provided",
                "entity" if len(entities.split("_")) == 1 else "entities",
                f'"{entities}" in\n{pp_file_list}',
            ]
        )
        partial_matches = [
            match.group()
            for match in [
                re.search(re.compile(f"[^_]*{entities}[^_]*"), file)
                for file in file_list
            ]
            if match is not None
        ]
        if partial_matches:
            if len(partial_matches) == 1:
                error_message += f'\nPerhaps you meant "{partial_matches[0]}"?'
            else:
                error_message = "\n".join(
                    [
                        error_message,
                        "Perhaps you meant one of these?",
                        *[f"- {match}" for match in partial_matches],
                    ]
                )
        raise LookupError(error_message)
    return matches 
[docs]
def bids_remove_entity(name, key):
    """Remove an entity from a BIDS string by key.
    Parameters
    ----------
    name : str
        BIDS string to remove entity from
    key : str
        BIDS key of entity to remove
    Returns
    -------
    str
        BIDS name with entity removed
    Examples
    --------
    >>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'space')
    'atlas-Yeo_res-2x2x2'
    >>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'res')
    'atlas-Yeo_space-MNI152NLin6'
    """
    return "_".join(
        entity
        for entity in bids_entities_from_filename(name)
        if not entity.startswith(f'{key.rstrip("-")}-')
    ) 
[docs]
def bids_retrieve_params(bids_config_dict, f_dict, dbg=False):
    """
    Retrieve the BIDS parameters from bids_config_dict for BIDS file
    corresponding to f_dict. If an exact match for f_dict is not found
    the nearest match is returned, corresponding to the BIDS inheritance
    principle.
    :param bids_config_dict: BIDS configuration dictionary, this is a
      multi-level dictionary that maps the components of a bids filename
      (i.e. sub, ses, acq, run) to a dictionary that contains the BIDS
      parameters (RepetitionTime, EchoTime, etc). This information is
      extracted from sidecar json files using the principle of inheritance
      using the bids_parse_configs function
    :param f_dict: Dictionary built from the name of a file in the BIDS
      format. This is built using the bids_decode_fname by splitting on
      "-" and "_" delimeters
    :param dbg: boolean flag that indicates whether or not debug statements
      should be printed, defaults to "False"
    :return: returns a dictionary that contains the BIDS parameters
    """
    params = {}
    t_dict = bids_config_dict  # pointer to current dictionary
    # try to populate the configuration using information
    # already in the list
    for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]:
        if level in f_dict:
            key = "-".join([level, f_dict[level]])
        else:
            key = "-".join([level, "none"])
        if dbg:
            UTLOGGER.debug(key)
        # if the key doesn't exist in the config dictionary, check to see if
        # the generic key exists and return that
        if key in t_dict:
            t_dict = t_dict[key]
        else:
            if dbg:
                UTLOGGER.debug(
                    "Couldn't find %s, so going with %s", key, "-".join([level, "none"])
                )
            key = "-".join([level, "none"])
            if key in t_dict:
                t_dict = t_dict[key]
    # if we have an image parameter dictionary at this level, use it to
    # initialize our configuration we look for "RepetitionTime", because
    #  according to the spec it is a mandatory parameter for JSON
    # sidecar files
    if dbg:
        UTLOGGER.debug(t_dict)
    for key in t_dict.keys():
        if "RepetitionTime" in key:
            params = t_dict
            break
    for k, v in params.items():
        if isinstance(v, str):
            params[k] = v.encode("ascii", errors="ignore")
    return params 
[docs]
def bids_parse_sidecar(config_dict, dbg=False, raise_error=True):
    # type: (dict, bool) -> dict
    """
    Uses the BIDS principle of inheritance to build a data structure that
    maps parameters in side car .json files to components in the names of
    corresponding nifti files.
    :param config_dict: dictionary that maps paths of sidecar json files
       (the key) to a dictionary containing the contents of the files (the values)
    :param dbg: boolean flag that indicates whether or not debug statements
       should be printed
    :return: a dictionary that maps parameters to components from BIDS filenames
       such as sub, sess, run, acq, and scan type
    """
    # we are going to build a large-scale data structure, consisting of many
    # levels of dictionaries to hold the data.
    bids_config_dict = {}
    # initialize 'default' entries, this essentially is a pointer traversal
    # of the dictionary
    t_dict = bids_config_dict
    for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]:
        key = "-".join([level, "none"])
        t_dict[key] = {}
        t_dict = t_dict[key]
    if dbg:
        UTLOGGER.debug(bids_config_dict)
    # get the paths to the json yaml files in config_dict, the paths contain
    # the information needed to map the parameters from the jsons (the vals
    # of the config_dict) to corresponding nifti files. We sort the list
    # by the number of path components, so that we can iterate from the outer
    # most path to inner-most, which will help us address the BIDS inheritance
    # principle
    config_paths = sorted(config_dict.keys(), key=lambda p: len(p.split("/")))
    if dbg:
        UTLOGGER.debug(config_paths)
    for cp in config_paths:
        if dbg:
            UTLOGGER.debug("processing %s", cp)
        # decode the filepath into its various components as defined by  BIDS
        f_dict = bids_decode_fname(cp, raise_error=raise_error)
        # handling inheritance is a complete pain, we will try to handle it by
        # build the key from the bottom up, starting with the most
        # parsimonious possible, incorporating configuration information that
        # exists at each level
        # first lets try to find any parameters that already apply at this
        # level using the information in the json's file path
        t_params = bids_retrieve_params(bids_config_dict, f_dict)
        # now populate the parameters
        bids_config = {}
        if t_params:
            bids_config.update(t_params)
        # add in the information from this config file
        t_config = config_dict[cp]
        if t_config is list:
            t_config = t_config[0]
        try:
            bids_config.update(t_config)
        except ValueError:
            err = (
                "\n[!] Could not properly parse the AWS S3 path provided "
                "- please double-check the bucket and the path.\n\nNote: "
                "This could either be an issue with the path or the way "
                "the data is organized in the directory. You can also "
                "try providing a specific site sub-directory.\n\n"
            )
            raise ValueError(err)
        # now put the configuration in the data structure, by first iterating
        # to the location of the key, and then inserting it. When a key isn't
        # defined we use the "none" value. A "none" indicates that the
        # corresponding parameters apply to all possible settings of that key
        # e.g. run-1, run-2, ... will all map to run-none if no jsons
        # explicitly define values for those runs
        t_dict = bids_config_dict  # pointer to current dictionary
        for level in [
            "scantype",
            "site",
            "sub",
            "ses",
            "task",
            "acq",
            "rec",
            "dir",
            "run",
        ]:
            if level in f_dict:
                key = "-".join([level, f_dict[level]])
            else:
                key = "-".join([level, "none"])
            if key not in t_dict:
                t_dict[key] = {}
            t_dict = t_dict[key]
        t_dict.update(bids_config)
    return bids_config_dict 
[docs]
def bids_shortest_entity(file_list):
    """Function to return the single file with the shortest chain of
    BIDS entities from a given list, returning the first if more than
    one have the same minimum length.
    Parameters
    ----------
    file_list : list of strings
    Returns
    -------
    str or None
    Examples
    --------
    >>> bids_shortest_entity([
    ...     's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
    ...     's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
    ... ])
    's3://fake/data/sub-001_ses-001_bold.nii.gz'
    """
    entity_lists = [bids_entities_from_filename(filename) for filename in file_list]
    if not entity_lists:
        return None
    shortest_len = min(len(entity_list) for entity_list in entity_lists)
    shortest_list = [
        file_list[i]
        for i in range(len(file_list))
        if len(entity_lists[i]) == shortest_len
    ]
    return shortest_list[0] if len(shortest_list) == 1 else shortest_list 
[docs]
def gen_bids_outputs_sublist(base_path, paths_list, key_list, creds_path):
    import copy
    func_keys = [
        "functional_to_anat_linear_xfm",
        "motion_params",
        "movement_parameters",
        "motion_correct",
    ]
    top_keys = list(set(key_list) - set(func_keys))
    bot_keys = list(set(key_list).intersection(func_keys))
    subjdict = {}
    if not base_path.endswith("/"):
        base_path = base_path + "/"
    # output directories are a bit different than standard BIDS, so
    # we handle things differently
    for p in paths_list:
        p = p.rstrip()
        # find the participant and session info which should be at
        # some level in the path
        path_base = p.replace(base_path, "")
        subj_info = path_base.split("/")[0]
        resource = path_base.split("/")[1]
        if resource not in key_list:
            continue
        if subj_info not in subjdict:
            subjdict[subj_info] = {"subj_info": subj_info}
        if creds_path:
            subjdict[subj_info]["creds_path"] = creds_path
        if resource in func_keys:
            run_info = path_base.split("/")[2]
            if "funcs" not in subjdict[subj_info]:
                subjdict[subj_info]["funcs"] = {}
            if run_info not in subjdict[subj_info]["funcs"]:
                subjdict[subj_info]["funcs"][run_info] = {"run_info": run_info}
            if resource in subjdict[subj_info]["funcs"][run_info]:
                UTLOGGER.warning("resource %s already exists in subjdict ??", resource)
            subjdict[subj_info]["funcs"][run_info][resource] = p
        else:
            subjdict[subj_info][resource] = p
    sublist = []
    for subj_info, subj_res in subjdict.items():
        missing = 0
        for tkey in top_keys:
            if tkey not in subj_res:
                UTLOGGER.warning("%s not found for %s", tkey, subj_info)
                missing += 1
                break
        if missing == 0:
            for func_key, func_res in subj_res["funcs"].items():
                for bkey in bot_keys:
                    if bkey not in func_res:
                        UTLOGGER.warning("%s not found for %s", bkey, func_key)
                        missing += 1
                        break
                if missing == 0:
                    UTLOGGER.info(
                        "adding: %s, %s, %d", subj_info, func_key, len(sublist)
                    )
                    tdict = copy.deepcopy(subj_res)
                    del tdict["funcs"]
                    tdict.update(func_res)
                    sublist.append(tdict)
    return sublist 
[docs]
def bids_gen_cpac_sublist(
    bids_dir,
    paths_list,
    config_dict,
    creds_path,
    dbg=False,
    raise_error=True,
    only_one_anat=True,
):
    """
    Generates a CPAC formatted subject list from information contained in a
    BIDS formatted set of data.
    Parameters
    ----------
    bids_dir : str
        base directory that contains all of the data, this could be a
        directory that contains data for a multiple BIDS datasets, in
        which case the intervening directories will be interpreted as
        site names
    paths_list : str
        lists of all nifti files found in bids_dir, these paths are
        relative to bids_dir
    config_dict : dict
        dictionary that contains information from the JSON sidecars
        found in bids_dir, keys are relative paths and values are
        dictionaries containing all of the parameter information. if
        config_dict is None, the subject list will be built without the
        parameters
    creds_path : str
        if using S3 bucket, this path credentials needed to access the
        bucket, if accessing anonymous bucket, this can be set to None
    dbg : bool
        indicating whether or not the debug statements should be
        printed
    raise_error : bool
    only_one_anat : bool
        The "anat" key for a subject expects a string value, but we can
        temporarily store a list instead by passing True here if we
        will be filtering that list down to a single string later
    Returns
    -------
    list
        a list of dictionaries suitable for use by CPAC to specify data
        to be processed
    """
    if dbg:
        UTLOGGER.debug(
            "gen_bids_sublist called with:\n  bids_dir: %s\n  # paths: %s"
            "\n  config_dict: %s\n  creds_path: %s",
            bids_dir,
            len(paths_list),
            "missing" if not config_dict else "found",
            creds_path,
        )
    # if configuration information is not desired, config_dict will be empty,
    # otherwise parse the information in the sidecar json files into a dict
    # we can use to extract data for our nifti files
    if config_dict:
        bids_config_dict = bids_parse_sidecar(config_dict, raise_error=raise_error)
    subdict = {}
    for p in paths_list:
        if bids_dir in p:
            str_list = p.split(bids_dir)
            val = str_list[0]
            val = val.rsplit("/")
            val = val[0]
        else:
            str_list = p.split("/")
            val = str_list[0]
        if "sub-" not in val:
            continue
        p = p.rstrip()
        f = os.path.basename(p)
        if f.endswith(".nii") or f.endswith(".nii.gz"):
            f_dict = bids_decode_fname(p, raise_error=raise_error)
            if config_dict:
                t_params = bids_retrieve_params(bids_config_dict, f_dict)
                if not t_params:
                    UTLOGGER.warning(
                        "Did not receive any parameters for %s, is this a problem?", p
                    )
                task_info = {
                    "scan": os.path.join(bids_dir, p),
                    "scan_parameters": t_params.copy(),
                }
            else:
                task_info = os.path.join(bids_dir, p)
            if "ses" not in f_dict:
                f_dict["ses"] = "1"
            if "sub" not in f_dict:
                raise IOError(
                    "sub not found in %s," % (p) + " perhaps it isn't in BIDS format?"
                )
            if f_dict["sub"] not in subdict:
                subdict[f_dict["sub"]] = {}
            subjid = "-".join(["sub", f_dict["sub"]])
            if f_dict["ses"] not in subdict[f_dict["sub"]]:
                subdict[f_dict["sub"]][f_dict["ses"]] = {
                    "creds_path": creds_path,
                    "site_id": "-".join(["site", f_dict["site"]]),
                    "subject_id": subjid,
                    "unique_id": "-".join(["ses", f_dict["ses"]]),
                }
            if "T1w" in f_dict["scantype"] or "T2w" in f_dict["scantype"]:
                if "lesion" in f_dict.keys() and "mask" in f_dict["lesion"]:
                    if "lesion_mask" not in subdict[f_dict["sub"]][f_dict["ses"]]:
                        subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"] = (
                            task_info["scan"]
                        )
                    else:
                        UTLOGGER.warning(
                            "Lesion mask file (%s) already found for (%s:%s)"
                            " discarding %s",
                            subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"],
                            f_dict["sub"],
                            f_dict["ses"],
                            p,
                        )
                # TODO deal with scan parameters anatomical
                if "anat" not in subdict[f_dict["sub"]][f_dict["ses"]]:
                    subdict[f_dict["sub"]][f_dict["ses"]]["anat"] = {}
                if (
                    f_dict["scantype"]
                    not in subdict[f_dict["sub"]][f_dict["ses"]]["anat"]
                ):
                    if only_one_anat:
                        subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
                            f_dict["scantype"]
                        ] = task_info["scan"] if config_dict else task_info
                    else:
                        subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
                            f_dict["scantype"]
                        ] = []
                if not only_one_anat:
                    subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
                        f_dict["scantype"]
                    ].append(task_info["scan"] if config_dict else task_info)
            if "bold" in f_dict["scantype"]:
                task_key = f_dict["task"]
                if "run" in f_dict:
                    task_key = "_".join([task_key, "-".join(["run", f_dict["run"]])])
                if "acq" in f_dict:
                    task_key = "_".join([task_key, "-".join(["acq", f_dict["acq"]])])
                if "func" not in subdict[f_dict["sub"]][f_dict["ses"]]:
                    subdict[f_dict["sub"]][f_dict["ses"]]["func"] = {}
                if task_key not in subdict[f_dict["sub"]][f_dict["ses"]]["func"]:
                    if not isinstance(task_info, dict):
                        task_info = {"scan": task_info}
                    subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key] = task_info
                else:
                    UTLOGGER.warning(
                        "Func file (%s) already found for (%s: %s: %s) discarding %s",
                        subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key],
                        f_dict["sub"],
                        f_dict["ses"],
                        task_key,
                        p,
                    )
            if "phase" in f_dict["scantype"]:
                if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
                    subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
                if (
                    f_dict["scantype"]
                    not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
                ):
                    subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
                        f_dict["scantype"]
                    ] = task_info
            if "magnitude" in f_dict["scantype"]:
                if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
                    subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
                if (
                    f_dict["scantype"]
                    not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
                ):
                    subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
                        f_dict["scantype"]
                    ] = task_info
            if "epi" in f_dict["scantype"]:
                pe_dir = f_dict["dir"]
                if "acq" in f_dict:
                    if "fMRI" in f_dict["acq"]:
                        if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
                            subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
                        if (
                            f"epi_{pe_dir}"
                            not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
                        ):
                            subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
                                f"epi_{pe_dir}"
                            ] = task_info
    sublist = []
    for ksub, sub in subdict.items():
        for kses, ses in sub.items():
            if "anat" in ses or "func" in ses:
                sublist.append(ses)
            else:
                if "anat" not in ses:
                    UTLOGGER.warning(
                        "%s %s %s is missing an anat",
                        ses["site_id"] if "none" not in ses["site_id"] else "",
                        ses["subject_id"],
                        ses["unique_id"],
                    )
                if "func" not in ses:
                    UTLOGGER.warning(
                        "%s %s %s is missing a func",
                        ses["site_id"] if "none" not in ses["site_id"] else "",
                        ses["subject_id"],
                        ses["unique_id"],
                    )
    return sublist 
[docs]
def collect_bids_files_configs(bids_dir, aws_input_creds=""):
    """
    :param bids_dir:
    :param aws_input_creds:
    :return:
    """
    file_paths = []
    config_dict = {}
    suffixes = [
        "T1w",
        "T2w",
        "bold",
        "epi",
        "phasediff",
        "phase1",
        "phase2",
        "magnitude",
        "magnitude1",
        "magnitude2",
    ]
    if bids_dir.lower().startswith("s3://"):
        # s3 paths begin with s3://bucket/
        bucket_name = bids_dir.split("/")[2]
        s3_prefix = "/".join(bids_dir.split("/")[:3])
        prefix = bids_dir.replace(s3_prefix, "").lstrip("/")
        if aws_input_creds:
            if not os.path.isfile(aws_input_creds):
                raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds))
        from indi_aws import fetch_creds
        bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
        UTLOGGER.info("gathering files from S3 bucket (%s) for %s", bucket, prefix)
        for s3_obj in bucket.objects.filter(Prefix=prefix):
            for suf in suffixes:
                if suf in str(s3_obj.key):
                    if suf == "epi" and "acq-fMRI" not in s3_obj.key:
                        continue
                    if str(s3_obj.key).endswith("json"):
                        try:
                            config_dict[s3_obj.key.replace(prefix, "").lstrip("/")] = (
                                json.loads(s3_obj.get()["Body"].read())
                            )
                        except Exception as e:
                            msg = (
                                f"Error retrieving {s3_obj.key.replace(prefix, '')}"
                                f" ({e.message})"
                            )
                            raise BotoCoreError(msg) from e
                    elif "nii" in str(s3_obj.key):
                        file_paths.append(
                            str(s3_obj.key).replace(prefix, "").lstrip("/")
                        )
    else:
        for root, dirs, files in os.walk(bids_dir, topdown=False, followlinks=True):
            if files:
                for f in files:
                    for suf in suffixes:
                        if suf == "epi" and "acq-fMRI" not in f:
                            continue
                        if "nii" in f and suf in f:
                            file_paths += [
                                os.path.join(root, f).replace(bids_dir, "").lstrip("/")
                            ]
                        if f.endswith("json") and suf in f:
                            try:
                                config_dict.update(
                                    {
                                        os.path.join(
                                            root.replace(bids_dir, "").lstrip("/"), f
                                        ): json.load(open(os.path.join(root, f), "r"))
                                    }
                                )
                            except UnicodeDecodeError:
                                msg = f"Could not decode {os.path.join(root, f)}"
                                raise UnicodeDecodeError(msg)
    if not file_paths and not config_dict:
        msg = (
            f"Didn't find any files in {bids_dir}. Please verify that the path is"
            " typed correctly, that you have read access to the directory, and that it"
            " is not empty."
        )
        raise IOError(msg)
    return file_paths, config_dict 
[docs]
def camelCase(string: str) -> str:  # pylint: disable=invalid-name
    """Convert a hyphenated string to camelCase.
    Parameters
    ----------
    string : str
        string to convert to camelCase
    Returns
    -------
    str
    Examples
    --------
    >>> camelCase('PearsonNilearn-aCompCor')
    'PearsonNilearnACompCor'
    >>> camelCase('mean-Pearson-Nilearn-aCompCor')
    'meanPearsonNilearnACompCor'
    """
    pieces = string.split("-")
    for i in range(1, len(pieces)):  # don't change case of first piece
        if pieces[i]:  # don't do anything to falsy pieces
            pieces[i] = f"{pieces[i][0].upper()}{pieces[i][1:]}"
    return "".join(pieces) 
[docs]
def combine_multiple_entity_instances(bids_str: str) -> str:
    """Combines mutliple instances of a key in a BIDS string to a single
    instance by camelCasing and concatenating the values.
    Parameters
    ----------
    bids_str : str
    Returns
    -------
    str
    Examples
    --------
    >>> combine_multiple_entity_instances(
    ...     'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_'
    ...     'desc-Nilearn_desc-36-param_suffix.ext')
    'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_desc-Nilearn36Param_suffix.ext'
    >>> combine_multiple_entity_instances(
    ...     'sub-1_ses-HBN_site-RU_task-rest_'
    ...     'run-1_framewise-displacement-power.1D')
    'sub-1_ses-HBN_site-RU_task-rest_run-1_framewiseDisplacementPower.1D'
    """
    _entity_list = bids_str.split("_")
    entity_list = _entity_list[:-1]
    suffixes = [camelCase(_entity_list[-1])]
    entities = {}
    for entity in entity_list:
        if "-" in entity:
            key, value = entity.split("-", maxsplit=1)
            if key not in entities:
                entities[key] = []
            entities[key].append(value)
    for key, value in entities.items():
        entities[key] = camelCase("-".join(value))
    if "desc" in entities:  # make 'desc' final entity
        suffixes.insert(0, f'desc-{entities.pop("desc")}')
    return "_".join([f"{key}-{value}" for key, value in entities.items()] + suffixes) 
[docs]
def insert_entity(resource, key, value):
    """Insert a `f'{key}-{value}'` BIDS entity before `desc-` if
    present or before the suffix otherwise.
    Parameters
    ----------
    resource, key, value : str
    Returns
    -------
    str
    Examples
    --------
    >>> insert_entity('run-1_desc-preproc_bold', 'reg', 'default')
    'run-1_reg-default_desc-preproc_bold'
    >>> insert_entity('run-1_bold', 'reg', 'default')
    'run-1_reg-default_bold'
    >>> insert_entity('run-1_desc-preproc_bold', 'filt', 'notch4c0p31bw0p12')
    'run-1_filt-notch4c0p31bw0p12_desc-preproc_bold'
    >>> insert_entity('run-1_reg-default_bold', 'filt', 'notch4c0p31bw0p12')
    'run-1_reg-default_filt-notch4c0p31bw0p12_bold'
    """
    entities = resource.split("_")[:-1]
    suff = resource.split("_")[-1]
    new_entities = [[], []]
    for entity in entities:
        if entity.startswith("desc-"):
            new_entities[1].append(entity)
        else:
            new_entities[0].append(entity)
    return "_".join([*new_entities[0], f"{key}-{value}", *new_entities[1], suff]) 
[docs]
def load_yaml_config(config_filename, aws_input_creds):
    if config_filename.lower().startswith("data:"):
        try:
            header, encoded = config_filename.split(",", 1)
            config_content = b64decode(encoded)
            return yaml.safe_load(config_content)
        except:
            msg = f"Error! Could not find load config from data URI {config_filename}"
            raise BotoCoreError(msg)
    if config_filename.lower().startswith("s3://"):
        # s3 paths begin with s3://bucket/
        bucket_name = config_filename.split("/")[2]
        s3_prefix = "/".join(config_filename.split("/")[:3])
        prefix = config_filename.replace(s3_prefix, "").lstrip("/")
        if aws_input_creds:
            if not os.path.isfile(aws_input_creds):
                raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds))
        from indi_aws import fetch_creds
        bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
        downloaded_config = "/tmp/" + os.path.basename(config_filename)
        bucket.download_file(prefix, downloaded_config)
        config_filename = downloaded_config
    config_filename = os.path.realpath(config_filename)
    try:
        return yaml.safe_load(open(config_filename, "r"))
    except IOError:
        msg = f"Error! Could not find config file {config_filename}"
        raise FileNotFoundError(msg) 
[docs]
def cl_strip_brackets(arg_list):
    """Removes '[' from before first and ']' from after final
    arguments in a list of commandline arguments.
    Parameters
    ----------
    arg_list : list
    Returns
    -------
    list
    Examples
    --------
    >>> cl_strip_brackets('[a b c]'.split(' '))
    ['a', 'b', 'c']
    >>> cl_strip_brackets('a b c'.split(' '))
    ['a', 'b', 'c']
    >>> cl_strip_brackets('[ a b c ]'.split(' '))
    ['a', 'b', 'c']
    """
    arg_list[0] = arg_list[0].lstrip("[")
    arg_list[-1] = arg_list[-1].rstrip("]")
    return [arg for arg in arg_list if arg] 
[docs]
def create_cpac_data_config(
    bids_dir,
    participant_labels=None,
    aws_input_creds=None,
    skip_bids_validator=False,
    only_one_anat=True,
):
    """
    Create a C-PAC data config YAML file from a BIDS directory.
    Parameters
    ----------
    bids_dir : str
    participant_labels : list or None
    aws_input_creds
    skip_bids_validator : bool
    only_one_anat : bool
        The "anat" key for a subject expects a string value, but we
        can temporarily store a list instead by passing True here if
        we will be filtering that list down to a single string later
    Returns
    -------
    list
    """
    UTLOGGER.info("Parsing %s..", bids_dir)
    (file_paths, config) = collect_bids_files_configs(bids_dir, aws_input_creds)
    if participant_labels and file_paths:
        file_paths = [
            file_path
            for file_path in file_paths
            if any(
                participant_label in file_path
                for participant_label in participant_labels
            )
        ]
    if not file_paths:
        UTLOGGER.error("Did not find data for %s", ", ".join(participant_labels))
        sys.exit(1)
    raise_error = not skip_bids_validator
    sub_list = bids_gen_cpac_sublist(
        bids_dir,
        file_paths,
        config,
        aws_input_creds,
        raise_error=raise_error,
        only_one_anat=only_one_anat,
    )
    if not sub_list:
        UTLOGGER.error("Did not find data in %s", bids_dir)
        sys.exit(1)
    return sub_list 
[docs]
def load_cpac_data_config(data_config_file, participant_labels, aws_input_creds):
    """
    Loads the file as a check to make sure it is available and readable.
    Parameters
    ----------
    data_config_file : str
        path to data config
    participants_labels : list or None
    aws_input_creds
    Returns
    -------
    list
    """
    sub_list = load_yaml_config(data_config_file, aws_input_creds)
    if participant_labels:
        sub_list = [
            d
            for d in sub_list
            if (
                d["subject_id"]
                if d["subject_id"].startswith("sub-")
                else "sub-" + d["subject_id"]
            )
            in participant_labels
        ]
        if not sub_list:
            UTLOGGER.error(
                "Did not find data for %s in %s",
                ", ".join(participant_labels),
                data_config_file
                if not data_config_file.startswith("data:")
                else "data URI",
            )
            sys.exit(1)
    return sub_list 
[docs]
def res_in_filename(cfg, label):
    """Specify resolution in filename.
    Parameters
    ----------
    cfg : CPAC.utils.configuration.Configuration
    label : str
    Returns
    -------
    label : str
    Examples
    --------
    >>> from CPAC.utils.configuration import Configuration
    >>> res_in_filename(Configuration({
    ...     'registration_workflows': {
    ...         'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
    ...     'sub-1_res-anat_bold')
    'sub-1_res-2x2x2_bold'
    >>> res_in_filename(Configuration({
    ...     'registration_workflows': {
    ...         'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
    ...     'sub-1_res-3mm_bold')
    'sub-1_res-3mm_bold'
    """
    if "_res-" in label:
        # replace resolution text with actual resolution
        resolution = label.split("_res-", 1)[1].split("_", 1)[0]
        resolution = {
            "anat": cfg[
                "registration_workflows",
                "anatomical_registration",
                "resolution_for_anat",
            ],
            "bold": cfg[
                "registration_workflows",
                "functional_registration",
                "func_registration_to_template",
                "output_resolution",
                "func_preproc_outputs",
            ],
            "derivative": cfg[
                "registration_workflows",
                "functional_registration",
                "func_registration_to_template",
                "output_resolution",
                "func_derivative_outputs",
            ],
        }.get(resolution, resolution)
        label = re.sub("_res-[A-Za-z0-9]*_", f"_res-{resolution}_", label)
    return label 
[docs]
def sub_list_filter_by_labels(sub_list, labels):
    """Function to filter a sub_list by provided BIDS labels for
    specified suffixes.
    Parameters
    ----------
    sub_list : list
    labels : dict
    labels['T1w'] : str or None
        C-PAC currently only uses a single T1w image
    labels['bold'] : str, list, or None
    Returns
    -------
    list
    """
    if labels.get("T1w"):
        sub_list = _sub_list_filter_by_label(sub_list, "T1w", labels["T1w"])
    if labels.get("bold"):
        labels["bold"] = cl_strip_brackets(labels["bold"])
        sub_list = _sub_list_filter_by_label(sub_list, "bold", labels["bold"])
    return sub_list 
[docs]
def with_key(entity: str, key: str) -> str:
    """Return a keyed BIDS entity.
    Parameters
    ----------
    entity, key : str
    Returns
    -------
    str
    Examples
    --------
    >>> with_key('sub-1', 'sub')
    'sub-1'
    >>> with_key('1', 'sub')
    'sub-1'
    """
    if not isinstance(entity, str):
        entity = str(entity)
    if not entity.startswith(f"{key}-"):
        entity = "-".join((key, entity))
    return entity 
[docs]
def without_key(entity: str, key: str) -> str:
    """Return a BIDS entity value.
    Parameters
    ----------
    entity, key : str
    Returns
    -------
    str
    Examples
    --------
    >>> without_key('sub-1', 'sub')
    '1'
    >>> without_key('1', 'sub')
    '1'
    """
    if not isinstance(entity, str):
        entity = str(entity)
    if entity.startswith(f"{key}-"):
        entity = entity.replace(f"{key}-", "")
    return entity 
def _t1w_filter(anat, shortest_entity, label):
    """Helper function to filter T1w paths.
    Parameters
    ----------
    anat: list or str
    shortest_entity: bool
    label: str
    Returns
    -------
    anat: list
    """
    if not isinstance(anat, list):
        anat = [anat]
    if shortest_entity:
        anat = bids_shortest_entity(anat)
    else:
        anat = bids_match_entities(anat, label, "T1w")
        # pylint: disable=invalid-name
        try:
            anat_T2 = bids_match_entities(anat, label, "T2w")
        except LookupError:
            anat_T2 = None
        if anat_T2 is not None:
            anat = anat_T2
    return anat
def _sub_anat_filter(anat, shortest_entity, label):
    """Helper function to filter anat paths in sub_list.
    Parameters
    ----------
    anat : list or dict
    shortest_entity : bool
    label : str
    Returns
    -------
    list or dict
        same type as 'anat' parameter
    """
    if isinstance(anat, dict):
        if "T1w" in anat:
            anat["T1w"] = _t1w_filter(anat["T1w"], shortest_entity, label)
        return anat
    return _t1w_filter(anat, shortest_entity, label)
def _sub_list_filter_by_label(sub_list, label_type, label):
    """Function to filter a sub_list by a CLI-provided label.
    Parameters
    ----------
    sub_list : list
    label_type : str
        'T1w' or 'bold'
    label : str or list
    Returns
    -------
    list
    Examples
    --------
    >>> from CPAC.pipeline.test.sample_data import sub_list
    >>> _sub_list_filter_by_label(sub_list, 'bold', 'task-PEER1')[
    ...     0]['func'].keys()
    dict_keys(['PEER1'])
    """
    label_list = [label] if isinstance(label, str) else list(label)
    new_sub_list = []
    if label_type in label_list:
        shortest_entity = True
        label_list.remove(label_type)
    else:
        shortest_entity = False
    if label_type == "T1w":
        for sub in [sub for sub in sub_list if "anat" in sub]:
            try:
                sub["anat"] = _sub_anat_filter(
                    sub["anat"],
                    shortest_entity,
                    label_list[0] if not shortest_entity else None,
                )
                if sub["anat"]:
                    new_sub_list.append(sub)
            except LookupError as lookup_error:
                warn(str(lookup_error))
    elif label_type == "bold":
        for sub in [sub for sub in sub_list if "func" in sub]:
            try:
                all_scans = [sub["func"][scan].get("scan") for scan in sub["func"]]
                new_func = {}
                for entities in label_list:
                    matched_scans = bids_match_entities(all_scans, entities, label_type)
                    for scan in matched_scans:
                        new_func = {
                            **new_func,
                            **_match_functional_scan(sub["func"], scan),
                        }
                if shortest_entity:
                    new_func = {
                        **new_func,
                        **_match_functional_scan(
                            sub["func"], bids_shortest_entity(all_scans)
                        ),
                    }
                sub["func"] = new_func
                new_sub_list.append(sub)
            except LookupError as lookup_error:
                warn(str(lookup_error))
    return new_sub_list
def _match_functional_scan(sub_list_func_dict, scan_file_to_match):
    """Function to subset a scan from a sub_list_func_dict by a scan filename.
    Parameters
    ----------
    sub_list_func_dict : dict
        sub_list[sub]['func']
    scan_file_to_match : str
    Returns
    -------
    dict
    Examples
    --------
    >>> from CPAC.pipeline.test.sample_data import sub_list
    >>> matched = _match_functional_scan(
    ...     sub_list[0]['func'],
    ...     '/fake/data/sub-0001/ses-NFB3/func/'
    ...     'sub-0001_ses-NFB3_task-PEER1_bold.nii.gz')
    >>> matched.keys()
    dict_keys(['PEER1'])
    >>> all([key in matched['PEER1'] for key in [
    ...     'fmap_mag', 'fmap_phase', 'scan', 'scan_parameters'
    ... ]])
    True
    """
    return {
        entity: sub_list_func_dict[entity]
        for entity in sub_list_func_dict
        if sub_list_func_dict[entity].get("scan") == scan_file_to_match
    }