Source code for simba.core.data.loaders

import pickle
from collections.abc import Iterator, Sequence
from typing import IO

import numpy as np
from pyteomics import mgf
from tqdm import tqdm

from simba.config import Config
from simba.core.chemistry import chem_utils
from simba.core.data.nist_loader import NistLoader
from simba.core.data.preprocessing import PreprocessingUtils
from simba.core.data.spectrum import SpectrumExt
from simba.logger_setup import logger
from simba.murcko_scaffold import MurckoScaffold
from simba.preprocessor import Preprocessor
from simba.spectrum_utils import spectrum_hash


[docs] class LoadData:
[docs] def get_spectra( source: IO | str, scan_nrs: Sequence[int] = None, compute_classes=False, config=None, use_gnps_format=True, use_only_protonized_adducts=True, ) -> Iterator[SpectrumExt]: """ Get the MS/MS spectra from the given MGF file, optionally filtering by scan number. Parameters ---------- source : Union[IO, str] The MGF source (file name or open file object) from which the spectra are read. scan_nrs : Sequence[int] Only read spectra with the given scan numbers. If `None`, no filtering on scan number is performed. compute_classes : bool Whether to compute chemical superclass, class and subclass of the molecules using Classyfire. config : Config Configuration object containing parameters for preprocessing. use_gnps_format : bool Whether the MGF file follows the GNPS format. If `False`, it is assumed to follow the Janssen format. Returns ------- Iterator[SpectrumExt] An iterator over the requested spectra in the given file. """ with mgf.MGF(source) as f_in: # Iterate over a subset of spectra filtered by scan number. if scan_nrs is not None: def spectrum_it(): for scan_nr, spectrum_dict in enumerate(f_in): if scan_nr in scan_nrs: yield spectrum_dict # Or iterate over all MS/MS spectra. else: def spectrum_it(): yield from f_in total_results = [] for spectrum in spectrum_it(): # try: if use_only_protonized_adducts: if use_gnps_format: condition, res = LoadData.is_valid_spectrum_gnps( spectrum, config ) else: # janssen format condition, res = LoadData.is_valid_spectrum_janssen( spectrum, config ) else: condition, res = LoadData.default_filters(spectrum, config) total_results.append(res) if condition: # yield spectrum['params']['name'] spec = LoadData._parse_spectrum( spectrum, compute_classes=compute_classes, use_gnps_format=use_gnps_format, ) if spec is not None: yield spec
# except ValueError as e: # pass
[docs] @staticmethod def get_precursor_mz(spectrum): if "pepmass" in spectrum["params"]: if len(spectrum["params"]["pepmass"]) > 0: precursor_mz = float(spectrum["params"]["pepmass"][0]) else: precursor_mz = float(spectrum["params"]["pepmass"]) elif "precursor_mz" in spectrum["params"]: precursor_mz = float(spectrum["params"]["precursor_mz"]) else: precursor_mz = None return precursor_mz
[docs] @staticmethod def default_filters(spectrum: SpectrumExt, config: Config): cond_library = True # all the library is good cond_charge = True precursor_mz = LoadData.get_precursor_mz(spectrum) cond_pepmass = precursor_mz is not None and precursor_mz > 0 cond_mz_array = len(spectrum["m/z array"]) >= config.MIN_N_PEAKS cond_ion_mode = True cond_name = True cond_inchi_smiles = True cond_centroid = PreprocessingUtils.is_centroid(spectrum["intensity array"]) ##cond_name=True ##cond_name=True dict_results = { "cond_library": cond_library, "cond_charge": cond_charge, "cond_pepmass": cond_pepmass, "cond_mz_array": cond_mz_array, "cond_ion_mode": cond_ion_mode, "cond_name": cond_name, "cond_centroid": cond_centroid, "cond_inchi_smiles": cond_inchi_smiles, } # return cond_ion_mode and cond_mz_array total_condition = ( cond_library and cond_charge and cond_pepmass and cond_mz_array and cond_ion_mode and cond_name and cond_centroid and cond_inchi_smiles ) return total_condition, dict_results
[docs] @staticmethod def is_valid_spectrum_janssen(spectrum: SpectrumExt, config: Config): cond_library = True # all the library is good if "charge" in spectrum["params"]: cond_charge = int(spectrum["params"]["charge"][0]) in config.CHARGES else: cond_charge = True # try: # cond_pepmass = float(spectrum["params"]["pepmass"][0]) > 0 # except: # cond_pepmass = float(spectrum["params"]["pepmass"]) > 0 cond_pepmass = True cond_mz_array = len(spectrum["m/z array"]) >= config.MIN_N_PEAKS if "ionmode" in spectrum["params"]: cond_ion_mode = spectrum["params"]["ionmode"].lower() == "positive" else: cond_ion_mode = True if "adduct" in spectrum["params"]: cond_name = spectrum["params"]["adduct"] in [ "M+", "[M+H]+", "M+H", ] else: logger.warning( "Adduct information not found in spectrum. Please make sure the spectra corresponds to protonized adducts [M+H]" ) cond_name = True if "smiles" in spectrum["params"]: cond_inchi_smiles = ( # spectrum['params']["inchi"] != "N/A" or spectrum["params"]["smiles"] != "N/A" ) else: logger.warning("Smiles not found in spectrum.") cond_inchi_smiles = True cond_centroid = PreprocessingUtils.is_centroid(spectrum["intensity array"]) ##cond_name=True ##cond_name=True dict_results = { "cond_library": cond_library, "cond_charge": cond_charge, "cond_pepmass": cond_pepmass, "cond_mz_array": cond_mz_array, "cond_ion_mode": cond_ion_mode, "cond_name": cond_name, "cond_centroid": cond_centroid, "cond_inchi_smiles": cond_inchi_smiles, } # return cond_ion_mode and cond_mz_array total_condition = ( cond_library and cond_charge and cond_pepmass and cond_mz_array and cond_ion_mode and cond_name and cond_centroid and cond_inchi_smiles ) return total_condition, dict_results
[docs] @staticmethod def is_valid_spectrum_gnps(spectrum: SpectrumExt, config: Config): if "libraryquality" in spectrum["params"].keys(): cond_library = int(spectrum["params"]["libraryquality"]) <= 3 else: cond_library = True if "charge" in spectrum["params"]: cond_charge = int(spectrum["params"]["charge"][0]) in config.CHARGES else: cond_charge = True # try to convert to float the pep mass try: cond_pepmass = float(spectrum["params"]["pepmass"][0]) > 0 except: cond_pepmass = False cond_mz_array = len(spectrum["m/z array"]) >= config.MIN_N_PEAKS if "ionmode" in spectrum["params"]: cond_ion_mode = spectrum["params"]["ionmode"] == "Positive" else: cond_ion_mode = True cond_name = spectrum["params"]["name"].rstrip().endswith(" M+H") cond_centroid = PreprocessingUtils.is_centroid(spectrum["intensity array"]) cond_inchi_smiles = ( # spectrum['params']["inchi"] != "N/A" or (spectrum["params"]["smiles"] != "N/A") & (spectrum["params"]["smiles"] != "") ) ##cond_name=True ##cond_name=True dict_results = { "cond_library": cond_library, "cond_charge": cond_charge, "cond_pepmass": cond_pepmass, "cond_mz_array": cond_mz_array, "cond_ion_mode": cond_ion_mode, "cond_name": cond_name, "cond_centroid": cond_centroid, "cond_inchi_smiles": cond_inchi_smiles, } # return cond_ion_mode and cond_mz_array total_condition = ( cond_library and cond_charge and cond_pepmass and cond_mz_array and cond_ion_mode and cond_name and cond_centroid and cond_inchi_smiles ) return total_condition, dict_results
def _parse_spectrum( spectrum_dict: dict, compute_classes: bool = False, use_gnps_format: bool = True, ) -> SpectrumExt: """ Parse the Pyteomics spectrum dict. Parameters ---------- spectrum_dict : Dict The Pyteomics spectrum dict to be parsed. compute_classes : bool Whether to compute chemical superclass, class and subclass of the molecules using Classyfire. use_gnps_format : bool Whether the MGF file follows the GNPS format. If `False`, it is assumed to follow the Janssen format. Returns ------- SpectrumExt The parsed spectrum. """ # identifier = spectrum_dict['params']['title'] if use_gnps_format: # GNPS identifier = spectrum_dict["params"]["spectrumid"] inchi = spectrum_dict["params"]["inchi"] library = spectrum_dict["params"]["organism"] else: # JANSSEN if "id" in spectrum_dict["params"]: identifier = spectrum_dict["params"]["id"] elif "title" in spectrum_dict["params"]: identifier = spectrum_dict["params"]["title"] else: identifier = "none" inchi = "" library = "janssen" params = spectrum_dict["params"] library = library inchi = inchi smiles = params["smiles"] if "smiles" in params else "" precursor_mz = LoadData.get_precursor_mz(spectrum_dict) if precursor_mz is None: return None if "charge" in params: charge = int(params["charge"][0]) else: charge = None ionmode = params["ionmode"].lower() if "ionmode" in params else "none" if "adduct" in params: adduct = params["adduct"].replace(" ", "") adduct_mass = chem_utils.ion_to_mass(adduct) if adduct_mass is None: logger.warning(f"Adduct {adduct} not supported.") adduct = "" adduct_mass = 0.0 else: adduct = "" adduct_mass = 0.0 ce = params["ce"] if "ce" in params else None ia = params["ion_activation"] if "ion_activation" in params else None im = params["ionization_method"] if "ionization_method" in params else None # compute hash id value spectrum_hash_result = spectrum_hash( spectrum_dict["m/z array"], spectrum_dict["intensity array"] ) # calculate Murcko-Scaffold class bms = MurckoScaffold.get_bm_scaffold(smiles) # classes if compute_classes: clss = PreprocessingUtils.get_class(inchi, smiles) superclass = clss[0] classe = clss[1] subclass = clss[2] else: superclass = None classe = None subclass = None spec = SpectrumExt( identifier=identifier, precursor_mz=precursor_mz, # Re-assign charge 0 to 1. precursor_charge=charge, mz=np.array(spectrum_dict["m/z array"]), intensity=np.array(spectrum_dict["intensity array"]), retention_time=np.nan, params=params, library=library, inchi=inchi, smiles=smiles, ionmode=ionmode, adduct_mass=adduct_mass, ce=ce, ion_activation=ia, ionization_method=im, bms=bms, superclass=superclass, classe=classe, subclass=subclass, inchi_key=params["inchikey"] if "inchikey" in params else None, spectrum_hash=spectrum_hash_result, ) # postprocessing # spec=spec.remove_precursor_peak(0.1, "Da") # spec=spec.filter_intensity(0.01) return spec
[docs] def get_all_spectra_mgf( file: IO | str, num_samples: int = -1, compute_classes: bool = False, use_tqdm: bool = True, config=None, use_gnps_format: bool = True, use_only_protonized_adducts=True, ) -> list[SpectrumExt]: """ Get the MS/MS spectra from the given MGF file, optionally filtering by scan number. Parameters ---------- file : Union[IO, str] The MGF file (file name or open file object) from which the spectra are read. num_samples : int The maximum number of spectra to read. If -1, all spectra are read. compute_classes : bool Whether to compute chemical superclass, class and subclass of the molecules using Classyfire. use_tqdm : bool Whether to display a progress bar using tqdm. config : Config Configuration object containing parameters for preprocessing. use_gnps_format : bool Whether the MGF file follows the GNPS format. If `False`, it is assumed to follow the Janssen format. use_only_protonized_adducts : bool Whether to filter spectra to only include those with protonated adducts ([M+H]+). Returns ------- List[SpectrumExt] A list of the parsed spectra. """ num_samples = 10**8 if num_samples == -1 else num_samples spectra = [] # to save all the spectra spectra_to_process = LoadData.get_spectra( file, compute_classes=compute_classes, config=config, use_gnps_format=use_gnps_format, use_only_protonized_adducts=use_only_protonized_adducts, ) if use_tqdm: iterator = tqdm(range(0, num_samples)) else: iterator = range(0, num_samples) # preprocessor pp = Preprocessor() for i in iterator: try: spectrum = next(spectra_to_process) # spectrum = pp.preprocess_spectrum(spectrum) spectra.append(spectrum) except StopIteration: # in case it is not possible to get more samples logger.info(f"Only {i} spectra found.") break # go to next iteration return spectra
[docs] def get_all_spectra_nist( file, num_samples=10, compute_classes=False, use_tqdm=True, config=None, initial_line_number=0, ): """ Get the MS/MS spectra from the given MGF file, optionally filtering by scan number. Parameters ---------- source : Union[IO, str] The MGF source (file name or open file object) from which the spectra are read. scan_nrs : Sequence[int] Only read spectra with the given scan numbers. If `None`, no filtering on scan number is performed. Returns ------- Iterator[SpectrumExt] An iterator over the requested spectra in the given file. """ nist_loader = NistLoader() spectra, current_line_number = nist_loader.parse_file( file, num_samples=num_samples, initial_line_number=initial_line_number, ) # check adducts # print([s['identifier'] for s in spectrums]) spectra = nist_loader.compute_all_smiles(spectra, use_tqdm=use_tqdm) # processing all_spectra = [] pp = Preprocessor() for spectrum in spectra: # use the validation from gnps format since it is the format we are parsing condition, res = LoadData.is_valid_spectrum_gnps(spectrum, config=config) # print(res) if condition: # yield spectrum['params']['name'] spec = LoadData._parse_spectrum( spectrum, compute_classes=compute_classes ) # spec = pp.preprocess_spectrum(spec) if spec is not None: all_spectra.append(spec) return all_spectra, current_line_number
[docs] def get_all_spectra_casmi( file, num_samples=10, compute_classes=False, use_tqdm=True, config=None, initial_line_number=0, ): # open casmi file with open(file, "rb") as f: spectra_df = pickle.load(f) all_spectra_parsed = [] for index, spectra_row in spectra_df.iterrows(): # initialize spectrum_dict = {} spectrum_dict["params"] = {} # get info adduct = ( " M+H" if spectra_row["prec_type"] == "[M+H]+" else spectra_row["prec_type"] ) spectrum_dict["params"]["spectrumid"] = ( str(spectra_row["casmi_id"]) + adduct ) spectrum_dict["params"]["name"] = str(spectra_row["casmi_id"]) + adduct spectrum_dict["params"]["inchi"] = "" spectrum_dict["params"]["organism"] = "casmi" spectrum_dict["params"]["id"] = spectra_row["casmi_id"] spectrum_dict["params"]["smiles"] = spectra_row["smiles"] ionmode = "Positive" if spectra_row["ion_mode"] == "P" else "Negative" spectrum_dict["params"]["ionmode"] = ionmode spectrum_dict["params"]["pepmass"] = [spectra_row["prec_mz"]] spectrum_dict["params"]["charge"] = [1] spectrum_dict["params"]["libraryquality"] = 1 # get peaks peaks = spectra_row["peaks"] mz = np.array([p[0] for p in peaks]) intensity = np.array([p[1] for p in peaks]) spectrum_dict["m/z array"] = mz spectrum_dict["intensity array"] = intensity all_spectra_parsed.append(spectrum_dict) # processing all_spectra = [] pp = Preprocessor() for spectrum in all_spectra_parsed: # use the validation from gnps format since it is the format we are parsing condition, res = LoadData.is_valid_spectrum_gnps(spectrum, config=config) # print(res) if condition: # yield spectrum['params']['name'] spec = LoadData._parse_spectrum( spectrum, compute_classes=compute_classes ) # spec = pp.preprocess_spectrum(spec) if spec is not None: all_spectra.append(spec) return all_spectra
[docs] def get_all_spectra( file: IO | str, num_samples: int = 10, compute_classes: bool = False, use_tqdm: bool = True, use_nist: bool = False, config: Config = None, use_janssen: bool = False, ) -> list[SpectrumExt]: """ Get the MS/MS spectra from the given MGF or NIST file, optionally filtering by scan number. Parameters ---------- file : Union[IO, str] The MGF or NIST file (file name or open file object) from which the spectra are read. num_samples : int The maximum number of spectra to read. If -1, all spectra are read. compute_classes : bool Whether to compute chemical superclass, class and subclass of the molecules using Classyfire. use_tqdm : bool Whether to display a progress bar using tqdm. use_nist : bool Whether the file is a NIST file. If `False`, it is assumed to be an MGF file. config : Config Configuration object containing parameters for preprocessing. use_janssen : bool Whether the MGF file follows the Janssen format. If `False`, it is assumed to follow the GNPS format. Returns ------- List[SpectrumExt] A list of the parsed spectra. """ if use_janssen: spectra = LoadData.get_all_spectra_mgf( file=file, num_samples=num_samples, compute_classes=compute_classes, use_tqdm=use_tqdm, config=config, use_gnps_format=False, ) # use format from Janssen elif use_nist: spectra, _ = LoadData.get_all_spectra_nist( file=file, num_samples=num_samples, compute_classes=compute_classes, use_tqdm=use_tqdm, config=config, ) else: spectra = LoadData.get_all_spectra_mgf( file=file, num_samples=num_samples, compute_classes=compute_classes, use_tqdm=use_tqdm, config=config, use_gnps_format=True, ) return spectra