-
Notifications
You must be signed in to change notification settings - Fork 0
/
methods.config
100 lines (93 loc) · 4.47 KB
/
methods.config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
includeConfig "${projectDir}/external/pipeline-Nextflow-config/config/methods/common_methods.config"
includeConfig "${projectDir}/external/pipeline-Nextflow-config/config/schema/schema.config"
includeConfig "${projectDir}/external/pipeline-Nextflow-config/config/bam/bam_parser.config"
includeConfig "${projectDir}/external/pipeline-Nextflow-config/config/retry/retry.config"
methods {
get_ids_from_bams = {
params.samples_to_process = [] as Set
params.readgroups_to_process = [] as Set
params.libraries_to_process = [] as Set
params.input['BAM'].each { k, v ->
v.each { sampleMap ->
def bam_path = sampleMap['path']
def bam_header = bam_parser.parse_bam_header(bam_path)
def sm_tags = bam_header['read_group'].collect{ it['SM'] }.unique()
if (sm_tags.size() != 1) {
throw new Exception("${bam_path} contains multiple samples! Please run pipeline with single sample BAMs.")
}
if (params.samples_to_process.any { it.orig_id == sm_tags[0] }) {
throw new Exception("Sample ${sm_tags[0]} was found in multiple BAMs. Please provide only one BAM per sample")
}
def new_sm_tag = methods.sanitize_uclahs_cds_id(sm_tags[0])
params.samples_to_process.add([
'bam': bam_path,
'orig_id': sm_tags[0],
'sm_id': new_sm_tag,
'read_length': sampleMap.getOrDefault('read_length', null),
'sample_type': k
])
bam_header['read_group'].collect{ it['LB'] }.unique().each { lib ->
def lib_id = methods.sanitize_uclahs_cds_id(lib)
def rgs = bam_header['read_group'].findAll{ it['LB'] == lib }.collect{ it['ID'] }
params.libraries_to_process.add([
'bam': bam_path,
'sm_id': new_sm_tag,
'rgs': rgs,
'lib_id': lib_id
])
}
bam_header['read_group'].each { rgMap ->
def lib_id = methods.sanitize_uclahs_cds_id(rgMap['LB'])
def rg_id = methods.sanitize_uclahs_cds_id(rgMap['ID'])
params.readgroups_to_process.add([
'bam': bam_path,
'sm_id': new_sm_tag,
'orig_rg_id': rgMap['ID'],
'rg_id': rg_id,
'lib_id': lib_id
])
}
}
}
}
// Set the output and log output dirs here.
set_output_dir = {
tz = TimeZone.getTimeZone('UTC')
def date = new Date().format("yyyyMMdd'T'HHmmss'Z'", tz)
params.output_dir_base = "${params.output_dir}/${manifest.name}-${manifest.version}/${params.patient_id.replace(' ', '_')}"
params.log_output_dir = "${params.output_dir_base}/log-${manifest.name}-${manifest.version}-${date}"
}
set_pipeline_logs = {
trace.enabled = true
trace.file = "${params.log_output_dir}/nextflow-log/trace.txt"
timeline.enabled = true
timeline.file = "${params.log_output_dir}/nextflow-log/timeline.html"
report.enabled = true
report.file = "${params.log_output_dir}/nextflow-log/report.html"
}
modify_base_allocations = {
if (!(params.containsKey('base_resource_update') && params.base_resource_update)) {
return
}
params.base_resource_update.each { resource, updates ->
updates.each { processes, multiplier ->
def processes_to_update = (custom_schema_types.is_string(processes)) ? [processes] : processes
methods.update_base_resource_allocation(resource, multiplier, processes_to_update)
}
}
}
setup = {
schema.load_custom_types("${projectDir}/external/pipeline-Nextflow-config/config/schema/custom_schema_types.config")
schema.load_custom_types("${projectDir}/config/custom_schema_types.config")
schema.validate()
methods.set_resources_allocation()
methods.modify_base_allocations()
retry.setup_retry()
methods.set_env()
methods.get_ids_from_bams()
methods.set_output_dir()
methods.set_pipeline_logs()
methods.setup_docker_cpus()
methods.setup_process_afterscript()
}
}