import os
import numpy as np
from simba.config import Config
[docs]
class LoadMCES:
[docs]
@staticmethod
def find_file(directory_path, prefix):
"""
Searches for a .pkl file in the given directory and returns the path of the first one found.
Args:
directory_path (str): The path of the directory to search in.
Returns:
str: The path of the first .pkl file found, or None if no such file exists.
"""
pickle_files = []
for root, _, files in os.walk(directory_path):
for file in files:
if file.startswith(prefix):
pickle_files.append(os.path.join(root, file))
return pickle_files
[docs]
@staticmethod
def load_raw_data(directory_path, prefix, partitions=10000000):
"""
load data for inspection purposes
"""
# find all np arrays
files = LoadMCES.find_file(directory_path, prefix)
print(directory_path)
# load np files
print("Loading the partitioned files of the pairs")
list_arrays = []
for i in list(range(0, min(len(files), partitions))):
f = files[i]
print(f"Processing batch {i}")
np_array = np.load(f)
print(f"Size: {np_array.shape[0]}")
list_arrays.append(np_array)
# merge
print("Merging")
if len(list_arrays) > 0:
return np.concatenate(list_arrays, axis=0)
else:
return np.array([])
[docs]
@staticmethod
def merge_numpy_arrays_mces(directory_path, prefix, remove_percentage=0.90):
"""
load np arrays containing data as well as apply normalization for training
"""
# find all np arrays
files = LoadMCES.find_file(directory_path, prefix)
# load np files
print("Loading the partitioned files of the pairs")
list_arrays = []
for i, f in enumerate(files):
print(f"Processing batch {i}")
np_array = np.load(f)
# select only the first 3 rows: index0, index1 and similarity
np_array = np_array[:, 0:3]
# print(f'Size without removal: {np_array.shape[0]}')
np_array = LoadMCES.remove_excess_low_pairs(
np_array,
remove_percentage=remove_percentage,
target_column=Config.COLUMN_EDIT_DISTANCE,
)
# print(f'Size with removal: {np_array.shape[0]}')
list_arrays.append(np_array)
# merge
print("Merging")
merged_array = np.concatenate(list_arrays, axis=0)
# normalize
print("Normalizing")
merged_array[:, Config.COLUMN_EDIT_DISTANCE] = LoadMCES.normalize_mces(
merged_array[:, Config.COLUMN_EDIT_DISTANCE]
)
print("Remove redundant pairs")
merged_array = np.unique(merged_array, axis=0)
# remove excess low pairs
# merged_array = LoadMCES.remove_excess_low_pairs(merged_array)
print(f"Size of data loaded: {merged_array.shape[0]}")
return merged_array
[docs]
@staticmethod
def add_high_similarity_pairs_edit_distance(merged_array):
max_index_spectrum = int(np.max(merged_array[:, 0]))
indexes_tani_high = np.zeros((max_index_spectrum, merged_array.shape[1]))
indexes_tani_high[:, 0] = np.arange(0, max_index_spectrum)
indexes_tani_high[:, 1] = np.arange(0, max_index_spectrum)
indexes_tani_high[:, 2] = 1
# if there is the extra column corresponding to tanimoto
if merged_array.shape[1] == 4:
indexes_tani_high[:, 3] = 1
merged_array = np.concatenate([merged_array, indexes_tani_high])
return merged_array
[docs]
@staticmethod
def merge_numpy_arrays_edit_distance(
directory_path, prefix, remove_percentage=0.90
):
"""
load np arrays containing data as well as apply normalization
"""
# find all np arrays
files = LoadMCES.find_file(directory_path, prefix)
# load np files
print("Loading the partitioned files of the pairs")
list_arrays = []
for i, f in enumerate(files):
print(f"Processing batch {i}")
np_array = np.load(f)
# select only the first 3 rows: index0, index1 and similarity
np_array = np_array[:, 0:3]
print(f"Size without removal: {np_array.shape[0]}")
np_array = LoadMCES.remove_excess_low_pairs(
np_array, remove_percentage=remove_percentage
)
print(f"Size with removal: {np_array.shape[0]}")
list_arrays.append(np_array)
# merge
print("Merging")
merged_array = np.concatenate(list_arrays, axis=0)
print("Normalizing")
merged_array[:, 2] = LoadMCES.normalize_mces(merged_array[:, 2])
# add the high similarity pairs
merged_array = LoadMCES.add_high_similarity_pairs_edit_distance(merged_array)
# normalize
# remove excess low pairs
# merged_array = LoadMCES.remove_excess_low_pairs(merged_array)
return merged_array
[docs]
@staticmethod
def merge_numpy_arrays_multitask(
directory_path,
prefix,
remove_percentage=0.00,
add_high_similarity_pairs=False,
normalize_mces=True,
normalize_ed=True,
):
"""
load np arrays containing data as well as apply normalization
"""
# call the configuration
config = Config()
# find all np arrays
files = LoadMCES.find_file(directory_path, prefix)
# load np files
print("Loading the partitioned files of the pairs")
pair_distances_chunks = []
for i, f in enumerate(files):
print(f"Processing batch {i}")
pair_distances = np.load(f)
# print(f'Size without removal: {np_array.shape[0]}')
pair_distances = LoadMCES.remove_excess_low_pairs(
pair_distances,
remove_percentage=remove_percentage,
target_column=config.COLUMN_EDIT_DISTANCE,
)
# print(f'Size with removal: {np_array.shape[0]}')
pair_distances_chunks.append(pair_distances)
# merge
print("Merging")
all_pair_distances = np.concatenate(
[chunk for chunk in pair_distances_chunks if len(chunk) > 0], axis=0
)
print("Normalizing")
if normalize_ed:
all_pair_distances[:, config.COLUMN_EDIT_DISTANCE] = LoadMCES.normalize_ed(
all_pair_distances[:, config.COLUMN_EDIT_DISTANCE],
)
if normalize_mces and not config.USE_TANIMOTO:
# if not using tanimoto normalize between 0 and 1
all_pair_distances[:, config.COLUMN_MCES20] = LoadMCES.normalize_mces20(
all_pair_distances[:, config.COLUMN_MCES20],
max_value=config.MCES20_MAX_VALUE,
remove_negative_values=True,
)
# add the high similarity pairs
if add_high_similarity_pairs:
all_pair_distances = LoadMCES.add_high_similarity_pairs_edit_distance(
all_pair_distances
)
# normalize
# remove excess low pairs
# merged_array = LoadMCES.remove_excess_low_pairs(merged_array)
print(f"Number of pairs loaded: {all_pair_distances.shape[0]} ")
return all_pair_distances
[docs]
@staticmethod
def merge_numpy_arrays(
directory_path,
prefix,
use_edit_distance,
use_multitask=False,
add_high_similarity_pairs=False,
remove_percentage=0,
normalize_mces=True,
normalize_ed=True,
):
"""
load np arrays containing data as well as apply normalization
"""
if use_multitask:
return LoadMCES.merge_numpy_arrays_multitask(
directory_path,
prefix,
add_high_similarity_pairs=add_high_similarity_pairs,
remove_percentage=remove_percentage,
normalize_mces=normalize_mces,
normalize_ed=normalize_ed,
)
else:
if use_edit_distance:
return LoadMCES.merge_numpy_arrays_edit_distance(
directory_path,
prefix,
)
else:
return LoadMCES.merge_numpy_arrays_mces(
directory_path,
prefix,
)
[docs]
@staticmethod
def remove_excess_low_pairs(
indexes_tani, remove_percentage=0.95, max_value=5, target_column=2
):
"""
remove the 90% of the low pairs to reduce the data loaded
"""
# get the sample size for the low range pairs
sample_size = indexes_tani.shape[0] - int(
remove_percentage * indexes_tani.shape[0]
)
print(f"Shape of data loaded from folder: {indexes_tani.shape[0]}")
# filter by high or low similarity, assuming MCES distance
indexes_tani_high = indexes_tani[indexes_tani[:, target_column] < max_value]
indexes_tani_low = indexes_tani[indexes_tani[:, target_column] >= max_value]
print(
f"indexes_tani_low.shape[0]: {indexes_tani_low.shape[0]}, sample_size:{sample_size}"
)
if remove_percentage > 0:
random_samples = np.random.randint(
0, indexes_tani_low.shape[0], sample_size
)
# index
indexes_tani_low = indexes_tani_low[random_samples]
return np.concatenate((indexes_tani_low, indexes_tani_high), axis=0)
[docs]
@staticmethod
def normalize_ed(ed, max_ed=5):
# asuming series
# normalize edit distance. the higher the mces the lower the similarity
# mces_normalized = mces.apply(lambda x:x if x<=max_mces else max_mces)
# return mces_normalized.apply(lambda x:(1-(x/max_mces)))
## asuming numpy
print(f"Example of input ed: {ed}")
ed_normalized = ed.copy()
ed_normalized[ed_normalized >= max_ed] = max_ed
ed_normalized = 1 - (ed_normalized / max_ed)
print(f"Example of normalized ed: {ed_normalized}")
return ed_normalized
[docs]
@staticmethod
def normalize_mces20(mcs20, max_value, remove_negative_values=True):
# asuming series
# normalize edit distance. the higher the mces the lower the similarity
# mces_normalized = mces.apply(lambda x:x if x<=max_mces else max_mces)
# return mces_normalized.apply(lambda x:(1-(x/max_mces)))
## asuming numpy
mcs20_normalized = 1 - mcs20 / max_value
if remove_negative_values:
mcs20_normalized[mcs20_normalized < 0] = 0
return mcs20_normalized
[docs]
@staticmethod
def load_mces_20_data(directory_path, prefix, number_folders):
"""
loads the mces with threshold 20 across different folders
"""
list_arrays = []
for index in range(0, number_folders):
array = LoadMCES.load_raw_data(
directory_path=directory_path + str(index), prefix=prefix
)
list_arrays.append(array)
# drop the lists that are empty
list_arrays = [arr for arr in list_arrays if arr.shape[0] > 0]
return np.concatenate(list_arrays, axis=0)