-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Saturated Ring Sampling Conformation Feature #34
base: master
Are you sure you want to change the base?
Changes from 6 commits
1ede47a
99c5fc8
e474dc4
d763851
9fb8c35
4ff1122
8671e46
999fe40
3efb624
3b04c35
cfea4f9
9af0c2a
b214983
b1ae4b0
105223b
4bb4515
a2c28f9
b28dae4
558541d
5ce28fc
6f1e015
66ddfe3
1389e70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,10 @@ | |
RDKitMolCreate) | ||
from rdkit import Chem | ||
from rdkit.Chem import AllChem | ||
|
||
import numpy as np | ||
from sklearn.cluster import AgglomerativeClustering | ||
from rdkit.Chem.rdMolAlign import AlignMolConformers | ||
from typing import Optional, Iterator | ||
|
||
def cpu_type(x): | ||
return max(1, min(int(x), cpu_count())) | ||
|
@@ -35,43 +38,222 @@ def mol_from_smi_or_molblock(ligand_string): | |
|
||
|
||
def mk_prepare_ligand(mol, verbose=False): | ||
pdbqt_string_list = [] | ||
preparator = MoleculePreparation(hydrate=False, flexible_amides=False, rigid_macrocycles=True, min_ring_size=7, max_ring_size=33) | ||
try: | ||
mol_setups = preparator.prepare(mol) | ||
for setup in mol_setups: | ||
pdbqt_string, is_ok, error_msg = PDBQTWriterLegacy.write_string(setup) | ||
if verbose: | ||
print(f"{setup}") | ||
cids = [x.GetId() for x in mol.GetConformers()] | ||
for cid in cids: | ||
mol_setups = preparator.prepare(mol, conformer_id=cid) | ||
|
||
for setup in mol_setups: | ||
pdbqt_string, is_ok, error_msg = PDBQTWriterLegacy.write_string(setup) | ||
if not is_ok: | ||
print(f"{mol.GetProp('_Name')} has error in converting to pdbqt: {error_msg}") | ||
pdbqt_string_list.append(pdbqt_string) | ||
if verbose: | ||
print(f"{setup}") | ||
except Exception: | ||
sys.stderr.write( | ||
"Warning. Incorrect mol object to convert to pdbqt. Continue. \n" | ||
) | ||
traceback.print_exc() | ||
pdbqt_string = None | ||
|
||
return pdbqt_string | ||
return pdbqt_string_list | ||
|
||
|
||
def GetConformerRMSFromAtomIds(mol: Chem.Mol, confId1: int, confId2: int, atomIds: Optional[bool | list[int]]=None, prealigned: bool=False) -> float: | ||
""" Returns the RMS between two conformations based on the atomIds passed as the input. | ||
By default, the conformers will be aligned to the first conformer | ||
before the RMS calculation and, as a side-effect, the second will be left | ||
in the aligned state. | ||
|
||
Arguments: | ||
- mol: the molecule | ||
- confId1: the id of the first conformer | ||
- confId2: the id of the second conformer | ||
- atomIds: (optional) list of atom ids to use a points for | ||
alingment **AND RMSD** calculation- defaults to all atoms | ||
- prealigned: (optional) by default the conformers are assumed | ||
be unaligned and the second conformer be aligned | ||
to the first | ||
|
||
""" | ||
# align the conformers if necessary | ||
# Note: the reference conformer is always the first one | ||
if not prealigned: | ||
if atomIds: | ||
AlignMolConformers(mol, confIds=[confId1, confId2], atomIds=atomIds) | ||
else: | ||
AlignMolConformers(mol, confIds=[confId1, confId2]) | ||
|
||
# calculate the RMS between the two conformations | ||
conf1 = mol.GetConformer(id=confId1) | ||
conf2 = mol.GetConformer(id=confId2) | ||
ssr = 0 | ||
for i in atomIds: | ||
d = conf1.GetAtomPosition(i).Distance(conf2.GetAtomPosition(i)) | ||
ssr += d * d | ||
ssr /= mol.GetNumAtoms() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add if statement to split the code execution and cover the previous behavior if atomsIds:
# new code
else:
# old code |
||
return np.sqrt(ssr) | ||
|
||
def GetConformerRMSMatrixForSaturatedRingMolecule(mol: Chem.Mol, atomIds:list[list[int]]=None, prealigned: bool=False) -> list[float]: | ||
""" Returns the RMS matrix of the conformers of a molecule based on the alignment and rmsd of saturated ring. | ||
The function calculates the mean of the RMSD of each saturated ring with the GetConformerRMSFromAtomIds. | ||
The alignment is done per ring (for example, three alignments are done for three saturated ring) | ||
|
||
By default, the conformers will be aligned to the first conformer | ||
before the RMS calculation and, as a side-effect, the second will be left | ||
in the aligned state. | ||
|
||
As a side-effect, the conformers will be aligned to the first | ||
conformer (i.e. the reference) and will left in the aligned state. | ||
|
||
Arguments: | ||
- mol: the molecule | ||
- atomIds: (optional) list of atom ids to use a points for | ||
alingment - defaults to all atoms | ||
- prealigned: (optional) by default the conformers are assumed | ||
be unaligned and will therefore be aligned to the | ||
first conformer | ||
|
||
Note that the returned RMS matrix is symmetrical, i.e. it is the | ||
lower half of the matrix, e.g. for 5 conformers:: | ||
|
||
rmsmatrix = [ a, | ||
b, c, | ||
d, e, f, | ||
g, h, i, j] | ||
|
||
where a is the RMS between conformers 0 and 1, b is the RMS between | ||
conformers 0 and 2, etc. | ||
This way it can be directly used as distance matrix in e.g. Butina | ||
clustering. | ||
|
||
def mol_embedding_3d(mol, seed=43): | ||
""" | ||
|
||
def gen_conf(mole, useRandomCoords, randomSeed): | ||
cmat_list = [] | ||
for atom_id_in_ring in atomIds: | ||
# if necessary, align the conformers | ||
# Note: the reference conformer is always the first one | ||
rmsvals = [] | ||
confIds = [conf.GetId() for conf in mol.GetConformers()] | ||
if not prealigned: | ||
if atom_id_in_ring: | ||
AlignMolConformers(mol, atomIds=atom_id_in_ring, RMSlist=rmsvals) | ||
else: | ||
AlignMolConformers(mol, RMSlist=rmsvals) | ||
else: # already prealigned | ||
for i in range(1, len(confIds)): | ||
rmsvals.append(GetConformerRMSFromAtomIds(mol, confIds[0], confIds[i], atomIds=atom_id_in_ring, prealigned=prealigned)) | ||
# loop over the conformations (except the reference one) | ||
cmat_per_ring = [] | ||
for i in range(1, len(confIds)): | ||
cmat_per_ring.append(rmsvals[i - 1]) | ||
for j in range(1, i): | ||
cmat_per_ring.append(GetConformerRMSFromAtomIds(mol, confIds[i], confIds[j], atomIds=atom_id_in_ring, prealigned=True)) | ||
|
||
cmat_list.append(np.array(cmat_per_ring)) | ||
|
||
cmat_list_array = np.array(cmat_list) | ||
|
||
return list(np.mean(cmat_list_array, axis=0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not strong in numpy to write the code directly from the top of my head. So, I'll explain. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am still confused, but do you want to calculate the mathematical RMSD based on the total number of atoms in the ring? Say we have a five- and six- membered ring. Instead of calculating: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I propose to calculate true RMSD which will account atoms in all ring systems - the latter equation. However, we have RMSD for individual rings only. and Therefore, I suggest to recalculate them into the final RMSD by simple math operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alright. I have edited the code to calculate the true RMSD based on the rdkit RMSD |
||
|
||
|
||
def mol_embedding_3d(mol: Chem.Mol, seed: int=43) -> Chem.Mol: | ||
|
||
def find_saturated_ring(mol: Chem.Mol) -> list[list[int]]: | ||
atom_list = mol.GetAtoms() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
ssr = Chem.GetSymmSSSR(mol) | ||
saturated_ring_list = [] | ||
for ring in ssr: | ||
is_atom_saturated_array = np.array([atom_list[atom_id].GetHybridization() == Chem.HybridizationType.SP3 for atom_id in ring]) | ||
is_ring_unsaturated = np.any(np.nonzero(is_atom_saturated_array==0)) | ||
if is_ring_unsaturated: | ||
continue | ||
|
||
saturated_ring_list.append(ring) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for ring in ssr:
is_atom_saturated_array = [mol.GetAtomWithIdx(atom_id).GetHybridization() == Chem.HybridizationType.SP3 for atom_id in ring])
if any(is_atom_saturated_array):
saturated_ring_list.append(ring) This seems more compact and expressive |
||
|
||
return saturated_ring_list | ||
|
||
def gen_conf(mole: Chem.Mol, useRandomCoords: bool, randomSeed: int, has_saturated_ring: bool) -> tuple[Chem.Mol, float]: | ||
params = AllChem.ETKDGv3() | ||
params.useRandomCoords = useRandomCoords | ||
params.randomSeed = randomSeed | ||
conf_stat = AllChem.EmbedMolecule(mole, params) | ||
if has_saturated_ring: | ||
#10 is used as default according to the C language documentation iirc, but I have to specify the numbers. | ||
conf_stat = AllChem.EmbedMultipleConfs(mole, 10, params) | ||
else: | ||
conf_stat = AllChem.EmbedMolecule(mole, params) | ||
return mole, conf_stat | ||
|
||
def remove_confs_rms(mol: Chem.Mol, saturated_ring_list: list[list[int]], rms: float=0.25, keep_nconf: Optional[bool | int]=None) -> Chem.Mol: | ||
""" | ||
The function uses AgglomerativeClustering to select conformers. | ||
|
||
:param mol: input molecule with multiple conformers | ||
:param rms: discard conformers which are closer than given value to a kept conformer | ||
:param keep_nconf: keep at most the given number of conformers. This parameter has precedence over rms | ||
:return: | ||
""" | ||
|
||
def gen_ids(ids: int) -> Iterator[int, int]: | ||
for i in range(1, len(ids)): | ||
for j in range(0, i): | ||
yield j, i | ||
|
||
if keep_nconf and mol.GetNumConformers() <= keep_nconf: | ||
return mol | ||
|
||
if mol.GetNumConformers() <= 1: | ||
return mol | ||
|
||
mol_tmp = Chem.RemoveHs(mol) # calc rms for heavy atoms only | ||
rms_ = GetConformerRMSMatrixForSaturatedRingMolecule(mol_tmp, atomIds=saturated_ring_list, prealigned=False) | ||
|
||
cids = [c.GetId() for c in mol_tmp.GetConformers()] | ||
arr = np.zeros((len(cids), len(cids))) | ||
for (i, j), v in zip(gen_ids(cids), rms_): | ||
arr[i, j] = v | ||
arr[j, i] = v | ||
if keep_nconf: | ||
cl = AgglomerativeClustering(n_clusters=keep_nconf, linkage='complete', metric='precomputed').fit(arr) | ||
else: | ||
cl = AgglomerativeClustering(n_clusters=None, linkage='complete', metric='precomputed', distance_threshold=rms).fit(arr) | ||
|
||
keep_ids = [] | ||
for i in set(cl.labels_): | ||
ids = np.where(cl.labels_ == i)[0] | ||
j = arr[np.ix_(ids, ids)].mean(axis=0).argmin() | ||
keep_ids.append(cids[j]) | ||
remove_ids = set(cids) - set(keep_ids) | ||
|
||
for cid in sorted(remove_ids, reverse=True): | ||
mol.RemoveConformer(cid) | ||
|
||
return mol | ||
|
||
|
||
if not isinstance(mol, Chem.Mol): | ||
return None | ||
|
||
saturated_ring = find_saturated_ring(mol) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to rename the variable and the function and add |
||
has_saturated_ring = (len(saturated_ring)>0) | ||
|
||
mol = Chem.AddHs(mol, addCoords=True) | ||
if not mol_is_3d(mol): # only for non 3D input structures | ||
mol, conf_stat = gen_conf(mol, useRandomCoords=False, randomSeed=seed) | ||
mol, conf_stat = gen_conf(mol, useRandomCoords=False, randomSeed=seed, has_saturated_ring=has_saturated_ring) | ||
if conf_stat == -1: | ||
# if molecule is big enough and rdkit cannot generate a conformation - use params.useRandomCoords = True | ||
mol, conf_stat = gen_conf(mol, useRandomCoords=True, randomSeed=seed) | ||
mol, conf_stat = gen_conf(mol, useRandomCoords=True, randomSeed=seed, has_saturated_ring=has_saturated_ring) | ||
if conf_stat == -1: | ||
return None | ||
AllChem.UFFOptimizeMolecule(mol, maxIters=100) | ||
print(f"[For Testing Only] {mol.GetProp('_Name')} has {len(saturated_ring)} saturated ring") | ||
print(f"[For Testing Only] Before removing conformation: {mol.GetProp('_Name')} has {mol.GetNumConformers()} conf") | ||
mol = remove_confs_rms(mol, saturated_ring) | ||
print(f"[For Testing Only] After removing conformation: {mol.GetProp('_Name')} has {mol.GetNumConformers()} conf") | ||
|
||
return mol | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,51 +75,63 @@ def mol_dock(mol, config): | |
|
||
config = __parse_config(config) | ||
|
||
|
||
|
||
mol_id = mol.GetProp('_Name') | ||
ligand_pdbqt = ligand_preparation(mol, boron_replacement=True) | ||
if ligand_pdbqt is None: | ||
ligand_pdbqt_list = ligand_preparation(mol, boron_replacement=True) | ||
|
||
if ligand_pdbqt_list is None: | ||
return mol_id, None | ||
|
||
output_fd, output_fname = tempfile.mkstemp(suffix='_output.json', text=True) | ||
ligand_fd, ligand_fname = tempfile.mkstemp(suffix='_ligand.pdbqt', text=True) | ||
|
||
try: | ||
with open(ligand_fname, 'wt') as f: | ||
f.write(ligand_pdbqt) | ||
|
||
p = os.path.realpath(__file__) | ||
python_exec = sys.executable | ||
cmd = f'{python_exec} {os.path.dirname(p)}/vina_dock_cli.py -l {ligand_fname} -p {config["protein"]} ' \ | ||
f'-o {output_fname} --center {" ".join(map(str, config["center"]))} ' \ | ||
f'--box_size {" ".join(map(str, config["box_size"]))} ' \ | ||
f'-e {config["exhaustiveness"]} --seed {config["seed"]} --nposes {config["n_poses"]} -c {config["ncpu"]}' | ||
start_time = timeit.default_timer() | ||
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) # this will trigger CalledProcessError and skip next lines) | ||
dock_time = round(timeit.default_timer() - start_time, 1) | ||
|
||
with open(output_fname) as f: | ||
res = f.read() | ||
if res: | ||
res = json.loads(res) | ||
mol_block = pdbqt2molblock(res['poses'].split('MODEL')[1], mol, mol_id) | ||
output = {'docking_score': res['docking_score'], | ||
'pdb_block': res['poses'], | ||
'mol_block': mol_block, | ||
'dock_time': dock_time} | ||
|
||
except subprocess.CalledProcessError as e: | ||
sys.stderr.write(f'Error caused by docking of {mol_id}\n') | ||
sys.stderr.write(str(e) + '\n') | ||
sys.stderr.write('STDERR output:\n') | ||
sys.stderr.write(e.stderr + '\n') | ||
sys.stderr.flush() | ||
output = None | ||
|
||
finally: | ||
os.close(output_fd) | ||
os.close(ligand_fd) | ||
os.unlink(ligand_fname) | ||
os.unlink(output_fname) | ||
dock_output_conformer_list = [] | ||
start_time = timeit.default_timer() | ||
for ligand_pdbqt in ligand_pdbqt_list: | ||
output_fd, output_fname = tempfile.mkstemp(suffix='_output.json', text=True) | ||
ligand_fd, ligand_fname = tempfile.mkstemp(suffix='_ligand.pdbqt', text=True) | ||
|
||
try: | ||
with open(ligand_fname, 'wt') as f: | ||
f.write(ligand_pdbqt) | ||
|
||
p = os.path.realpath(__file__) | ||
python_exec = sys.executable | ||
cmd = f'{python_exec} {os.path.dirname(p)}/vina_dock_cli.py -l {ligand_fname} -p {config["protein"]} ' \ | ||
f'-o {output_fname} --center {" ".join(map(str, config["center"]))} ' \ | ||
f'--box_size {" ".join(map(str, config["box_size"]))} ' \ | ||
f'-e {config["exhaustiveness"]} --seed {config["seed"]} --nposes {config["n_poses"]} -c {config["ncpu"]}' | ||
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) # this will trigger CalledProcessError and skip next lines) | ||
|
||
with open(output_fname) as f: | ||
res = f.read() | ||
if res: | ||
res = json.loads(res) | ||
mol_block = pdbqt2molblock(res['poses'].split('MODEL')[1], mol, mol_id) | ||
output = {'docking_score': res['docking_score'], | ||
'pdb_block': res['poses'], | ||
'mol_block': mol_block} | ||
|
||
dock_output_conformer_list.append(output) | ||
|
||
except subprocess.CalledProcessError as e: | ||
sys.stderr.write(f'Error caused by docking of {mol_id}\n') | ||
sys.stderr.write(str(e) + '\n') | ||
sys.stderr.write('STDERR output:\n') | ||
sys.stderr.write(e.stderr + '\n') | ||
sys.stderr.flush() | ||
output = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is already not necessary and can be deleted. If docking was unsuccessful it will not contribute to the |
||
|
||
finally: | ||
os.close(output_fd) | ||
os.close(ligand_fd) | ||
os.unlink(ligand_fname) | ||
os.unlink(output_fname) | ||
|
||
dock_time = round(timeit.default_timer() - start_time, 1) | ||
print(f'[For Testing Only Sanity check]: There are {len(dock_output_conformer_list)} {mol_id} conformers that has been docked') | ||
print(f'\n') | ||
docking_score_list = [float(conformer_output['docking_score']) for conformer_output in dock_output_conformer_list] | ||
DrrDom marked this conversation as resolved.
Show resolved
Hide resolved
|
||
output = dock_output_conformer_list[docking_score_list.index(min(docking_score_list))] | ||
output['dock_time'] = dock_time | ||
|
||
return mol_id, output | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will be
pdbqt_string
if notis_ok
? Is it reasonable to add this value to the list? Need to checkThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of the implicit hydrogen error, the
mk_prepare_ligand
return an empty string''
. So, is it better to not append the empty string and let the program continue?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is not reasonable. Then it can be treated in this way below. If one conformer will fail, maybe some other will pass, so no need to interrupt preparation. In the worst case we will return an empty list.