Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of CohortIncidence module. #147

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 85 additions & 48 deletions R/Module-CohortIncidence.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ CohortIncidenceModule <- R6::R6Class(
classname = "CohortIncidenceModule",
inherit = StrategusModule,
public = list(
#' @field tablePrefix The table prefix to append to results tables
tablePrefix = "ci_",
#' @description Initialize the module
initialize = function() {
super$initialize()
Expand All @@ -16,32 +18,35 @@ CohortIncidenceModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$execute(connectionDetails, analysisSpecifications, executionSettings)
on.exit(private$.clearLoggers())
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertClass(connectionDetails, "ConnectionDetails", add = errorMessages)
checkmate::assertClass(analysisSpecifications, "AnalysisSpecifications", add = errorMessages)
checkmate::assertClass(executionSettings, "ExecutionSettings", add = errorMessages)
checkmate::reportAssertions(collection = errorMessages)

# Setup the job context
private$.createJobContext(analysisSpecifications, executionSettings)

checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
resultsFolder <- private$jobContext$moduleExecutionSettings$resultsSubFolder
private$.message("Validating inputs")
private$.validate()

# Establish the connection and ensure the cleanup is performed
connection <- DatabaseConnector::connect(connectionDetails)
on.exit(DatabaseConnector::disconnect(connection))

# extract CohortIncidence design from jobContext
irDesign <- as.character(CohortIncidence::IncidenceDesign$new(jobContext$settings$irDesign)$asJSON())

irDesign <- as.character(CohortIncidence::IncidenceDesign$new(private$jobContext$settings$irDesign)$asJSON())

# construct buildOptions from executionSettings
# Questions:
# Will there be a subgroup cohort table?
# Are we pulling the source name from the right place?

buildOptions <- CohortIncidence::buildOptions(
cohortTable = paste0(jobContext$moduleExecutionSettings$workDatabaseSchema, ".", jobContext$moduleExecutionSettings$cohortTableNames$cohortTable),
cdmDatabaseSchema = jobContext$moduleExecutionSettings$cdmDatabaseSchema,
sourceName = as.character(jobContext$moduleExecutionSettings$databaseId),
cohortTable = paste0(private$jobContext$moduleExecutionSettings$workDatabaseSchema, ".", private$jobContext$moduleExecutionSettings$cohortTableNames$cohortTable),
cdmDatabaseSchema = private$jobContext$moduleExecutionSettings$cdmDatabaseSchema,
sourceName = as.character(private$jobContext$moduleExecutionSettings$databaseId),
refId = 1
)

Expand All @@ -52,43 +57,39 @@ CohortIncidenceModule <- R6::R6Class(
)

# Export the results
exportFolder <- jobContext$moduleExecutionSettings$resultsSubFolder
exportFolder <- private$jobContext$moduleExecutionSettings$resultsSubFolder
if (!dir.exists(exportFolder)) {
dir.create(exportFolder, recursive = TRUE)
}

rlang::inform("Export data")
if (nrow(executeResults) > 0) {
executeResults$database_id <- jobContext$moduleExecutionSettings$databaseId
} else {
executeResults$database_id <- character(0)
}
private$.message("Export data")

# apply minCellCount to executeResults
minCellCount <- jobContext$moduleExecutionSettings$minCellCount
minCellCount <- private$jobContext$moduleExecutionSettings$minCellCount
if (minCellCount > 0) {
executeResults <- private$.enforceMinCellValue(executeResults, "PERSONS_AT_RISK_PE", minCellCount)
executeResults <- private$.enforceMinCellValue(executeResults, "PERSONS_AT_RISK", minCellCount)
executeResults <- private$.enforceMinCellValue(executeResults, "PERSON_OUTCOMES_PE", minCellCount)
executeResults <- private$.enforceMinCellValue(executeResults, "PERSON_OUTCOMES", minCellCount)
executeResults <- private$.enforceMinCellValue(executeResults, "OUTCOMES_PE", minCellCount)
executeResults <- private$.enforceMinCellValue(executeResults, "OUTCOMES", minCellCount)
executeResults <- private$.enforceMinCellStats(executeResults)
executeResults$incidence_summary <- private$.enforceMinCellValue(executeResults$incidence_summary, "PERSONS_AT_RISK_PE", minCellCount)
executeResults$incidence_summary <- private$.enforceMinCellValue(executeResults$incidence_summary, "PERSONS_AT_RISK", minCellCount)
executeResults$incidence_summary <- private$.enforceMinCellValue(executeResults$incidence_summary, "PERSON_OUTCOMES_PE", minCellCount)
executeResults$incidence_summary <- private$.enforceMinCellValue(executeResults$incidence_summary, "PERSON_OUTCOMES", minCellCount)
executeResults$incidence_summary <- private$.enforceMinCellValue(executeResults$incidence_summary, "OUTCOMES_PE", minCellCount)
executeResults$incidence_summary <- private$.enforceMinCellValue(executeResults$incidence_summary, "OUTCOMES", minCellCount)
executeResults$incidence_summary <- private$.enforceMinCellStats(executeResults$incidence_summary)
}

readr::write_csv(executeResults, file.path(exportFolder, "incidence_summary.csv")) # this will be renamed later
for (tableName in names(executeResults)) {
tableData <- executeResults[[tableName]]
if (tableName == 'incidence_summary') {
if (nrow(tableData) > 0) {
tableData$database_id <- private$jobContext$moduleExecutionSettings$databaseId
} else {
tableData$database_id <- character(0)
}
}
readr::write_csv(tableData, file.path(exportFolder, paste0(self$tablePrefix,tableName,".csv")))
}

# TODO: Move the results data model into the package
# moduleInfo <- ParallelLogger::loadSettingsFromJson("MetaData.json")
# resultsDataModel <- readr::read_csv(file = "resultsDataModelSpecification.csv", show_col_types = FALSE)
# newTableNames <- paste0(moduleInfo$TablePrefix, resultsDataModel$"table_name")
# # Rename export files based on table prefix
# file.rename(
# file.path(exportFolder, paste0(unique(resultsDataModel$"table_name"), ".csv")),
# file.path(exportFolder, paste0(unique(newTableNames), ".csv"))
# )
# resultsDataModel$table_name <- newTableNames
# readr::write_csv(resultsDataModel, file.path(exportFolder, "resultsDataModelSpecification.csv"))
resultsDataModel <- private$.getResultsDataModelSpecification()
readr::write_csv(resultsDataModel, file.path(exportFolder, "resultsDataModelSpecification.csv"))

private$.message(paste("Results available at:", resultsFolder))
},
Expand All @@ -98,15 +99,41 @@ CohortIncidenceModule <- R6::R6Class(
#' @template tablePrefix
createResultsDataModel = function(resultsConnectionDetails, resultsDatabaseSchema, tablePrefix = "") {
super$createResultsDataModel(resultsConnectionDetails, resultsDatabaseSchema, tablePrefix)
stop("NOT IMPLEMENTED")
if (resultsConnectionDetails$dbms == "sqlite" & resultsDatabaseSchema != "main") {
stop("Invalid schema for sqlite, use databaseSchema = 'main'")
}

connection <- DatabaseConnector::connect(resultsConnectionDetails)
on.exit(DatabaseConnector::disconnect(connection))

# Create the results model
sql <- ResultModelManager::generateSqlSchema(schemaDefinition = private$.getResultsDataModelSpecification())
sql <- SqlRender::render(sql= sql, warnOnMissingParameters = TRUE, database_schema = resultsDatabaseSchema)
sql <- SqlRender::translate(sql = sql, targetDialect = resultsConnectionDetails$dbms)
DatabaseConnector::executeSql(connection, sql)
},
#' @description Upload the results for the module
#' @template resultsConnectionDetails
#' @template analysisSpecifications
#' @template resultsUploadSettings
uploadResults = function(resultsConnectionDetails, analysisSpecifications, resultsUploadSettings) {
super$uploadResults(resultsConnectionDetails, analysisSpecifications, resultsUploadSettings)
stop("NOT IMPLEMENTED")
resultsFolder <- private$jobContext$moduleExecutionSettings$resultsSubFolder
exportFolder <- private$jobContext$moduleExecutionSettings$resultsSubFolder

# use the results model spec that was saved along with the results output, not the embedded model spec.
resultsModelSpec <- readr::read_csv(
file = file.path(file.path(exportFolder, "resultsDataModelSpecification.csv")),
show_col_types = FALSE
)

ResultModelManager::uploadResults(
connectionDetails = resultsConnectionDetails,
schema = resultsUploadSettings$resultsDatabaseSchema,
resultsFolder = resultsFolder,
purgeSiteDataBeforeUploading = FALSE, # TODO: when to determine to purge? should that be in resultsUploadSettings?
specifications = resultsModelSpec
)
},
#' @description Creates the CohortIncidence Module Specifications
#' @param irDesign The incidence rate design created from the CohortIncidence
Expand All @@ -128,22 +155,18 @@ CohortIncidenceModule <- R6::R6Class(
super$validateModuleSpecifications(
moduleSpecifications = moduleSpecifications
)
}
),
private = list(
.validate = function() {
# Validate that the analysis specification will work when we
# enter the execute statement. This is done by deserializing the design.
irDesign <- CohortIncidence::IncidenceDesign$new(private$jobContext$settings$irDesign)
irDesign <- CohortIncidence::IncidenceDesign$new(moduleSpecifications$settings$irDesign)
designJson <- rJava::J("org.ohdsi.analysis.cohortincidence.design.CohortIncidence")$fromJson(as.character(irDesign$asJSON()))

invisible(designJson)
},
}
),
private = list(
.enforceMinCellValue = function(data, fieldName, minValues, silent = FALSE) {
toCensor <- !is.na(data[, fieldName]) & data[, fieldName] < minValues & data[, fieldName] != 0
if (!silent) {
percent <- round(100 * sum(toCensor) / nrow(data), 1)
ParallelLogger::logInfo(
private$.message(
" censoring ",
sum(toCensor),
" values (",
Expand All @@ -166,6 +189,20 @@ CohortIncidenceModule <- R6::R6Class(
data[toCensor, "INCIDENCE_PROPORTION_P100P"] <- NA

return(data)
},
.getResultsDataModelSpecification = function() {
rdms <- readr::read_csv(
file = private$.getResultsDataModelSpecificationFileLocation(),
show_col_types = FALSE
)
rdms$tableName <-paste0(self$tablePrefix, rdms$tableName)
return(rdms)
},
.getResultsDataModelSpecificationFileLocation = function() {
return(system.file(
file.path("csv", "cohortIncidenceRdms.csv"),
package = "Strategus"
))
}
)
)
45 changes: 45 additions & 0 deletions inst/csv/cohortIncidenceRdms.csv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My feeling is that this should be in the package for several reasons:

  1. If a change is introduced in the model for any reason that requires a release of both packages. Update just one and this will result in a break interoperability
  2. Users will always need to work with their data, even if they don't use strategus they may wish to use the shiny apps or other report generation tools. Now they will have a dependency they didn't previously need
  3. The current release of CohortIncidence just released a breaking change with regards to shiny apps and any other apps by not implementing migrations. Now, currently existing projects will no longer be upgradable without custom hacks for what is a relatively small change to the data model
  4. It's inconsistent with other packages and utilities
  5. It places a burden on the maintainer of Strategus to maintain 100% version consistency with CohortIncidence

Copy link
Contributor Author

@chrisknoll chrisknoll Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the questions and comments, @azimov . I'll preface this by saying I did give a lot of consideration between what goes into the package and what is part of the module. The deciding factor was if it was a Strategus concern, it went into the module, if it was a package concern, it goes into the package.

If a change is introduced in the model for any reason that requires a release of both packages. Update just one and this will result in a break interoperability

Not necessarily: if the new version of the package produces the same output, from the same input, then I don't think you need a new version of Strategus (just reference a new version in renv.lock). If the analytics change, either by inputs or outputs, I think it would make sense that a different version of Strategus would be necessary to represent the changes in dependencies. I would suggest 2 things on this front: a release of Strategus comes with a published renv.lock file that contains which versions of package dependencies have been tested with the given version of Strategus, and that within a single release of Strategus you may have multiple updates to underlying packages.

Users will always need to work with their data, even if they don't use strategus they may wish to use the shiny apps or other report generation tools. Now they will have a dependency they didn't previously need

While this is true, it can't use the OhdsiShinyModules to view results from the underlying packages at the package level (ie: take the output of CI package and use OhdsiShinyModules to view results). The reason is that the OhdsiShinyModules nave adopted conventions that have been specified by Strategus. 2 Examples: if you want the list of databases, you go to the database meta table (which was created by Strategus; and if you want to get cohort definition names, you go to the cg_table_prefixcohort_definition, a table created by CohortGenerator which Strategus enforces that CohortGenerator is part of the Strategus analsyis. See the following:
https://github.com/search?q=repo%3AOHDSI%2FOhdsiShinyModules%20prefixcohort_definition&type=code

Based on these decisions, I would suggest that OHDSIShinyModules becomes something like StrategusShinyModules, or we could embed the ShinyModules into the release of Strategus so that you have both the execution of analysis, persistence of results and report viewer all bound to the same version of the software. I fully understand that the original intent of OhdsiShinyModules was intended to view results of the individual HADES packages, but, in my estimation, this principle has been abandoned in favor of Strategus-specific concerns.

The current release of CohortIncidence just released a breaking change with regards to shiny apps and any other apps by not implementing migrations. Now, currently existing projects will no longer be upgradable without custom hacks for what is a relatively small change to the data model

I thought about this, but I decided on the approach for 2 reasons: 1) you won't be changing the version of the analysis packages within a single project. and 2) while I would have preferred a migration approach to this, the most expedient way was to use the functionality out of RMM to create a schema base don the results model spec. I'd like to move to a pure-migration approach to managing database schemas (as we do in WebAPI), but based on the nature that the schema shouldn't change within a study, I decided the simplest approach would be to use RMM.

It's inconsistent with other packages and utilities

There isn't exactly a published standard on this, and the module layer provides that structure. However, it is consistent with the EvidenseSynthisis package which is another RMM that is contained in Strategus.

This PR follows the Strategus conventions through the use of the common R6 classes for module implementation, and in this way, it is consistent with the other packages. This module layer allows the underlying packages to be independent towards how they can function most effectively (in their own way) while being consistent in execution across all modules.

There are a number of ways that packages are inconsistent: CI handles JSON serialization/deserializtion differently, but it is because serialization via ParallelLogger is broken. Not every package defines inputs via R6 classes. I think there is going to be differences in approaches for many of the HADES packages, and I can understand the desire to keep that consistent, it may be more trouble than it's worth to get down to every possible detail.

It places a burden on the maintainer of Strategus to maintain 100% version consistency with CohortIncidence

I'm not sure this is the case, as @anthonysena wasn't involved in any of the implementation of this PR for incorporation of CI into Strategus. I do expect that we'll have different collaborators as the 'responsible party' for the maintenance of the underlying modules (which may or may not be the underlying package maintainer), and I think this will be coordinated through the Strategus team.

Thanks again for your thoughts and comments. Let me know if I misunderstood anything, and I will be glad to provide more detail.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
tableName,columnName,dataType,isRequired,primaryKey
incidence_summary,ref_id ,int,no,no
incidence_summary,database_id,varchar(255),yes,no
incidence_summary,source_name,varchar(255),no,no
incidence_summary,target_cohort_definition_id,bigint,no,no
incidence_summary,tar_id,bigint,no,no
incidence_summary,subgroup_id,bigint,no,no
incidence_summary,outcome_id,bigint,no,no
incidence_summary,age_group_id,int,no,no
incidence_summary,gender_id,int,no,no
incidence_summary,gender_name,varchar(255),no,no
incidence_summary,start_year,int,no,no
incidence_summary,persons_at_risk_pe,bigint,no,no
incidence_summary,persons_at_risk,bigint,no,no
incidence_summary,person_days_pe,bigint,no,no
incidence_summary,person_days,bigint,no,no
incidence_summary,person_outcomes_pe,bigint,no,no
incidence_summary,person_outcomes,bigint,no,no
incidence_summary,outcomes_pe,bigint,no,no
incidence_summary,outcomes,bigint,no,no
incidence_summary,incidence_proportion_p100p,float,no,no
incidence_summary,incidence_rate_p100py,float,no,no
target_def,ref_id,int,yes,yes
target_def,target_cohort_definition_id,bigint,yes,yes
target_def,target_name,varchar(255),no,no
outcome_def,ref_id,int,yes,yes
outcome_def,outcome_id,bigint,yes,yes
outcome_def,outcome_cohort_definition_id,bigint,no,no
outcome_def,outcome_name,varchar(255),no,no
outcome_def,clean_window,bigint,no,no
outcome_def,excluded_cohort_definition_id,bigint,no,no
tar_def,ref_id,int,yes,yes
tar_def,tar_id,bigint,yes,yes
tar_def,tar_start_with,varchar(10),no,no
tar_def,tar_start_offset,bigint,no,no
tar_def,tar_end_with,varchar(10),no,no
tar_def,tar_end_offset,bigint,no,no
age_group_def,ref_id,int,yes,yes
age_group_def,age_group_id,int,yes,yes
age_group_def,age_group_name,varchar(255),yes,no
age_group_def,min_age,int,no,no
age_group_def,max_age,int,no,no
subgroup_def,ref_id,int,yes,yes
subgroup_def,subgroup_id,bigint,no,yes
subgroup_def,subgroup_name,varchar(255),no,no
Loading