Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix testing #860

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,20 @@ check_build_reqs:


prepare: check_venv
$(pip) install numpy

# TODO scikit-learn can't even begin to install unless numpy is already there, so numpy has to be first and by itself.
# See https://github.com/scikit-learn/scikit-learn/issues/4164
$(pip) install scipy scikit-learn
$(pip) install numpy
# scikit-learn needs Cython to build from source (which we *shouldn't* do), but doesn't require it in a way that lets pip know to install it.
$(pip) install "cython>=0.28.5"
$(pip) install scipy "scikit-learn>=0.22.1,<=2.0"
# To install pyvcf, we need a version of setuptools before 58. Pyvcf
# doesn't have the necessary build requirement information because its last
# release predates setuptools' breaking change by several years.
$(pip) install "setuptools<58"
# The old Docker that comes with the old Toil we use needs an old
# Requeests. See https://github.com/docker/docker-py/issues/3256.
$(pip) install "requests<2.32.0"
$(pip) install pytest 'toil[aws,mesos]==4.1.0' biopython pyvcf
pip list
clean_prepare: check_venv
Expand Down
2 changes: 1 addition & 1 deletion scripts/construct_bakeoff_graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
found here s3://vg-data/bakeoff/
The input fasta's and vcf's are expected to be there already.
Assumes you have authenticated S3 access configured. If not, the files are
mirrored to https://courtyard.gi.ucsc.edu/~anovak/vg-data/bakeoff/
mirrored to https://public.gi.ucsc.edu/~anovak/vg-data/bakeoff/
"""

import os, sys, subprocess
Expand Down
5 changes: 3 additions & 2 deletions src/toil_vg/test/test_vg.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def _ci_input_path(self, filename):
Get the URL from which an input file can be obtained.
"""
# /public/groups/vg/vg-data on Courtyard is served as
# https://courtyard.gi.ucsc.edu/~anovak/vg-data/. These are also the
# https://public.gi.ucsc.edu/~anovak/vg-data/. These are also the
# files from the s3://vg-data bucket.
return 'https://courtyard.gi.ucsc.edu/~anovak/vg-data/toil_vg_ci/{}'.format(filename)
return 'https://public.gi.ucsc.edu/~anovak/vg-data/toil_vg_ci/{}'.format(filename)

def _download_input(self, filename, local_path = None):
# Where should we put this input file?
Expand Down Expand Up @@ -900,6 +900,7 @@ def test_20_sv_genotyping_giraffe(self):

self._run(['toil-vg', 'index', self.jobStoreLocal, self.local_outstore,
'--container', self.containerType,
'--realTimeLogging',
'--graphs', os.path.join(self.local_outstore, 'HGSVC_chr21.vg'), os.path.join(self.local_outstore, 'HGSVC_chr22.vg'),
'--chroms', 'chr21', 'chr22',
'--vcf_phasing', vcf_path,
Expand Down
2 changes: 1 addition & 1 deletion src/toil_vg/vg_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def call_with_docker(self, job, args, work_dir, outfile, errfile, check_output,
connection_sock.settimeout(10)

# Check the security cookie
received_cookie_and_newline = connection_sock.recv(len(security_cookie) + 1)
received_cookie_and_newline = connection_sock.recv(len(security_cookie) + 1).decode('utf-8', errors='replace')

if received_cookie_and_newline != security_cookie + "\n":
# Incorrect security cookie.
Expand Down
4 changes: 2 additions & 2 deletions src/toil_vg/vg_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
## of through docker.

# Docker image to use for vg
vg-docker: 'quay.io/vgteam/vg:ci-309-3db0ee65a49f4fb619263d01b62c5a5ea8aac497'
vg-docker: 'quay.io/vgteam/vg:ci-1578-a4e7ddd41774d8431160490db26670514c3fa921'

# Docker image to use for bcftools
bcftools-docker: 'quay.io/biocontainers/bcftools:1.9--h4da6232_0'
Expand Down Expand Up @@ -491,7 +491,7 @@
## of through docker.

# Docker image to use for vg
vg-docker: 'quay.io/vgteam/vg:ci-309-3db0ee65a49f4fb619263d01b62c5a5ea8aac497'
vg-docker: 'quay.io/vgteam/vg:ci-1578-a4e7ddd41774d8431160490db26670514c3fa921'

# Docker image to use for bcftools
bcftools-docker: 'quay.io/biocontainers/bcftools:1.9--h4da6232_0'
Expand Down
131 changes: 78 additions & 53 deletions src/toil_vg/vg_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,39 +500,14 @@ def run_xg_indexing(job, context, inputGraphFileIDs, graph_names, index_name,
job.fileStore.readGlobalFile(graph_id, graph_filename)
graph_filenames.append(os.path.basename(graph_filename))

# If we have a separate GBWT it will go here
gbwt_filename = os.path.join(work_dir, "{}.gbwt".format(index_name))
# And if we ahve a separate thread db it will go here
thread_db_filename = os.path.join(work_dir, "{}.threads".format(index_name))

# Get the vcf file for loading phasing info
if vcf_phasing_file_id:
phasing_file = os.path.join(work_dir, 'phasing.{}.vcf.gz'.format(index_name))
job.fileStore.readGlobalFile(vcf_phasing_file_id, phasing_file)
job.fileStore.readGlobalFile(tbi_phasing_file_id, phasing_file + '.tbi')
phasing_opts = ['-v', os.path.basename(phasing_file)]

if make_gbwt:
# Write the haplotype index to its own file
phasing_opts += ['--gbwt-name', os.path.basename(gbwt_filename)]

for region in gbwt_regions:
phasing_opts += ['--region', region]

if context.config.force_phasing:
# We need to discard overlaps also to really get rid of haplotype breaks.
phasing_opts += ['--force-phasing', '--discard-overlaps']
else:
phasing_opts = []

# Where do we put the XG index?
xg_filename = "{}.xg".format(index_name)

# Now run the indexer.
RealtimeLogger.info("XG Indexing {}".format(str(graph_filenames)))

command = ['vg', 'index', '--threads', str(job.cores), '--xg-name', os.path.basename(xg_filename)]
command += phasing_opts + graph_filenames
command += graph_filenames
command += ['--temp-dir', os.path.join('.', os.path.basename(index_temp_dir))]

if include_alt_paths:
Expand All @@ -555,11 +530,64 @@ def run_xg_indexing(job, context, inputGraphFileIDs, graph_names, index_name,
# Determine if we want to checkpoint index to output store
write_function = context.write_intermediate_file if intermediate else context.write_output_file
xg_file_id = write_function(job, os.path.join(work_dir, xg_filename))


# We can no longer make the GBWT in the vg index call after https://github.com/vgteam/vg/pull/4433/.
# Also, vg gbwt needs a graph all in one file, with alt paths.
gbwt_file_id = None
thread_db_file_id = None
if make_gbwt and vcf_phasing_file_id:
# Also save the GBWT if it was generated
if make_gbwt:

# We need a VCF to do this.
assert vcf_phasing_file_id is not None
assert tbi_phasing_file_id is not None
phasing_file = os.path.join(work_dir, 'phasing.{}.vcf.gz'.format(index_name))
job.fileStore.readGlobalFile(vcf_phasing_file_id, phasing_file)
job.fileStore.readGlobalFile(tbi_phasing_file_id, phasing_file + '.tbi')

# Determine where to put the GBWT
gbwt_filename = os.path.join(work_dir, "{}.gbwt".format(index_name))

# We need a single input graph with alt paths
if include_alt_paths:
# We already have this
single_input_graph_filename = xg_filename
elif len(graph_filenames) == 1:
# We have just one input graph and don't need to combine.
single_input_graph_filename = graph_filenames[0]
else:
# We need to make a combined graph.
single_input_graph_filename = os.path.join(work_dir, "combined_with_alts.vg")
# We assume the IDs are alreay joined.
combine_command = ['vg', 'combine'] + graph_filenames
try:
context.runner.call(job, combine_command, work_dir=work_dir, outfile=open(single_input_graph_filename, 'wb'))
except:
# Dump everything we need to replicate the index run
logging.error("Graph combining failed. Dumping files.")
for graph_filename in graph_filenames:
context.write_output_file(job, os.path.join(work_dir, graph_filename))
raise

gbwt_command = ['vg', 'gbwt', '-x', os.path.basename(single_input_graph_filename), '-v', os.path.basename(phasing_file), '-o', os.path.basename(gbwt_filename)]

for region in gbwt_regions:
gbwt_command += ['--vcf-region', region]

if context.config.force_phasing:
# We need to discard overlaps also to really get rid of haplotype breaks.
gbwt_command += ['--force-phasing', '--discard-overlaps']

try:
context.runner.call(job, gbwt_command, work_dir=work_dir)
except:
# Dump everything we need to replicate the index run
logging.error("GBWT indexing failed. Dumping files.")

context.write_output_file(job, phasing_file)
context.write_output_file(job, phasing_file + '.tbi')
context.write_output_file(job, single_input_graph_filename)

raise

gbwt_file_id = write_function(job, gbwt_filename)

end_time = timeit.default_timer()
Expand Down Expand Up @@ -704,12 +732,9 @@ def run_snarl_indexing(job, context, inputGraphFileIDs, graph_names, index_name=

return snarl_file_id

def run_distance_indexing(job, context, input_xg_id, input_trivial_snarls_id, index_name=None, max_distance_threshold=0):
def run_distance_indexing(job, context, input_xg_id, index_name=None, max_distance_threshold=0):
"""
Make a distance index from the given XG index and the given snarls file,
including the trivial snarls.

TODO: also support a single VG file and snarls.
Make a distance index from the given XG index.

If index_name is not None, saves it as <index_name>.dist to the output
store.
Expand All @@ -730,14 +755,12 @@ def run_distance_indexing(job, context, input_xg_id, input_trivial_snarls_id, in
# Download the input files.
xg_filename = os.path.join(work_dir, 'graph.xg')
job.fileStore.readGlobalFile(input_xg_id, xg_filename)
trivial_snarls_filename = os.path.join(work_dir, 'graph.trivial.snarls')
job.fileStore.readGlobalFile(input_trivial_snarls_id, trivial_snarls_filename)

# Where do we put the distance index?
distance_filename = os.path.join(work_dir, (index_name if index_name is not None else 'graph') + '.dist')

cmd = ['vg', 'index', '-t', max(1, int(job.cores)), '-j', os.path.basename(distance_filename),
'-x', os.path.basename(xg_filename), '-s', os.path.basename(trivial_snarls_filename)]
'-x', os.path.basename(xg_filename)]

if max_distance_threshold > 0:
# Add a max distance index with this limit.
Expand All @@ -751,7 +774,6 @@ def run_distance_indexing(job, context, input_xg_id, input_trivial_snarls_id, in
# Dump everything we need to replicate the indexing
logging.error("Distance indexing failed. Dumping files.")
context.write_output_file(job, xg_filename)
context.write_output_file(job, trivial_snarls_filename)
if os.path.exists(distance_filename):
context.write_output_file(job, distance_filename)
raise
Expand All @@ -770,7 +792,7 @@ def run_distance_indexing(job, context, input_xg_id, input_trivial_snarls_id, in

return distance_file_id

def run_minimizer_indexing(job, context, input_xg_id, input_gbwt_id, index_name=None):
def run_minimizer_indexing(job, context, input_xg_id, input_distance_id, input_gbwt_id, index_name=None):
"""
Make a minimizer index file for the graph and haplotypes described by the
given input XG and GBWT indexes.
Expand All @@ -796,13 +818,16 @@ def run_minimizer_indexing(job, context, input_xg_id, input_gbwt_id, index_name=
# Download the input files.
xg_filename = os.path.join(work_dir, 'graph.xg')
job.fileStore.readGlobalFile(input_xg_id, xg_filename)
distance_filename = os.path.join(work_dir, 'graph.dist')
job.fileStore.readGlobalFile(input_distance_id, distance_filename)
gbwt_filename = os.path.join(work_dir, 'graph.gbwt')
job.fileStore.readGlobalFile(input_gbwt_id, gbwt_filename)

# Where do we put the minimizer index?
minimizer_filename = os.path.join(work_dir, (index_name if index_name is not None else 'graph') + '.min')

cmd = ['vg', 'minimizer', '-t', max(1, int(job.cores)), '-i', os.path.basename(minimizer_filename),
'--distance-index', os.path.basename(distance_filename),
'-g', os.path.basename(gbwt_filename)] + context.config.minimizer_opts + [os.path.basename(xg_filename)]
try:
# Compute the index to the correct file
Expand All @@ -811,8 +836,8 @@ def run_minimizer_indexing(job, context, input_xg_id, input_gbwt_id, index_name=
# Dump everything we need to replicate the indexing
logging.error("Minimizer indexing failed. Dumping files.")
context.write_output_file(job, xg_filename)
context.write_output_file(job, distance_filename)
context.write_output_file(job, gbwt_filename)
context.write_output_file(job, minimizer_filename)
raise

if index_name is not None:
Expand Down Expand Up @@ -1175,13 +1200,12 @@ def run_indexing(job, context, inputGraphFileIDs,
index will also be computed. If no phasing VCFs are provided, computing
this index will be skipped.

If the 'minimizer' index is requested, the 'xg' index will also be
computed, and the 'gbwt' index will either be computed or sourced from
gbwt_id. If the 'gbwt' index is not available, computing this index will be
skipped.
If the 'minimizer' index is requested, the 'xg' and 'distance' indexes will
also be computed, and the 'gbwt' index will either be computed or sourced
from gbwt_id. If the 'gbwt' index is not available, computing this index
will be skipped.

If the 'distance' index is requested, the 'trivial_snarls' and 'xg' indexes
will also be computed.
If the 'distance' index is requested, the 'xg' index will also be computed.

If coalesce_regions is set, it must be a list of sets of 'chroms' region
names. Each set of region names will be expected to be together in a graph
Expand Down Expand Up @@ -1234,13 +1258,13 @@ def run_indexing(job, context, inputGraphFileIDs,
if 'minimizer' in wanted:
# The minimizer index has some dependencies
wanted.add('xg')
wanted.add('distance')
if not gbwt_id:
wanted.add('gbwt')

if 'distance' in wanted:
# The distance index also has some dependencies
wanted.add('xg')
wanted.add('trivial_snarls')

# We guarantee that if 'gbwt' is in indexes, then there is (a promise for)
# an actual GBWT.
Expand Down Expand Up @@ -1406,15 +1430,13 @@ def run_indexing(job, context, inputGraphFileIDs,
indexes['trivial_snarls'] = trivial_snarls_job.rv()

if 'distance' in wanted:
# We need a distance index, based on the XG and the trivial snarls, which we know are being computed.
# We need a distance index, based on the XG, which we know are being computed.
# Run it after our XG
distance_job = xg_root_job.addFollowOnJobFn(run_distance_indexing, context, indexes['xg'],
indexes['trivial_snarls'], index_name,
index_name,
cores=context.config.distance_index_cores,
memory=context.config.distance_index_mem,
disk=context.config.distance_index_disk)
# Make sure it waits for trivial snarls
trivial_snarls_job.addFollowOn(distance_job)

indexes['distance'] = distance_job.rv()

Expand All @@ -1428,10 +1450,13 @@ def run_indexing(job, context, inputGraphFileIDs,
# We know that, if the GBWT is being computed, it also happens under the XG job.
# TODO: change that.
minimizer_job = xg_root_job.addFollowOnJobFn(run_minimizer_indexing, context, indexes['xg'],
indexes['gbwt'], index_name,
indexes['distance'], indexes['gbwt'], index_name,
cores=context.config.minimizer_index_cores,
memory=context.config.minimizer_index_mem,
disk=context.config.minimizer_index_disk)

# Also wait for the distance index
distance_job.addFollowOn(minimizer_job)

indexes['minimizer'] = minimizer_job.rv()

Expand Down
Loading