Skip to content

Commit

Permalink
supporting more platforms
Browse files Browse the repository at this point in the history
- adding edits based on commit to old Characterization by Anthony Sena
  • Loading branch information
jreps committed Aug 5, 2024
1 parent a649f55 commit fa8ccd3
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 12 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/R_CMD_check_Hades.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,20 @@ jobs:
CDM5_REDSHIFT_PASSWORD: ${{ secrets.CDM5_REDSHIFT_PASSWORD }}
CDM5_REDSHIFT_SERVER: ${{ secrets.CDM5_REDSHIFT_SERVER }}
CDM5_REDSHIFT_USER: ${{ secrets.CDM5_REDSHIFT_USER }}
CDM_SNOWFLAKE_CDM53_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_CDM53_SCHEMA }}
CDM_SNOWFLAKE_OHDSI_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_OHDSI_SCHEMA }}
CDM_SNOWFLAKE_PASSWORD: ${{ secrets.CDM_SNOWFLAKE_PASSWORD }}
CDM_SNOWFLAKE_CONNECTION_STRING: ${{ secrets.CDM_SNOWFLAKE_CONNECTION_STRING }}
CDM_SNOWFLAKE_USER: ${{ secrets.CDM_SNOWFLAKE_USER }}
CDM5_SPARK_USER: ${{ secrets.CDM5_SPARK_USER }}
CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }}
CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }}
CDM5_SPARK_CDM_SCHEMA: ${{ secrets.CDM5_SPARK_CDM_SCHEMA }}
CDM5_SPARK_OHDSI_SCHEMA: ${{ secrets.CDM5_SPARK_OHDSI_SCHEMA }}
CDM_BIG_QUERY_CONNECTION_STRING: ${{ secrets.CDM_BIG_QUERY_CONNECTION_STRING }}
CDM_BIG_QUERY_KEY_FILE: ${{ secrets.CDM_BIG_QUERY_KEY_FILE }}
CDM_BIG_QUERY_CDM_SCHEMA: ${{ secrets.CDM_BIG_QUERY_CDM_SCHEMA }}
CDM_BIG_QUERY_OHDSI_SCHEMA: ${{ secrets.CDM_BIG_QUERY_OHDSI_SCHEMA }}

steps:
- uses: actions/checkout@v3
Expand Down
3 changes: 2 additions & 1 deletion R/Database.R
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ createCharacterizationTables <- function(
deleteExistingTables = T,
createTables = T,
tablePrefix = "c_",
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) {
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")
) {
errorMessages <- checkmate::makeAssertCollection()
.checkTablePrefix(
tablePrefix = tablePrefix,
Expand Down
2 changes: 1 addition & 1 deletion R/RunCharacterization.R
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ runCharacterizationAnalyses <- function(
targetTable,
outcomeDatabaseSchema,
outcomeTable,
tempEmulationSchema = NULL,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"),
cdmDatabaseSchema,
characterizationSettings,
outputDirectory,
Expand Down
17 changes: 9 additions & 8 deletions inst/sql/sql_server/CaseCohortsPart1.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
--need to know indication/target/outcome tuples

-- all targets
drop table if exists #targets_all;
IF OBJECT_ID('tempdb..#targets_all', 'U') IS NOT NULL DROP TABLE #targets_all;
select * into #targets_all
from @target_database_schema.@target_table
where cohort_definition_id in
(@target_ids);

-- first T with > minPrioObs
drop table if exists #targets_inclusions;
IF OBJECT_ID('tempdb..#targets_inclusions', 'U') IS NOT NULL DROP TABLE #targets_inclusions;
select * into #targets_inclusions
from
(select *,
Expand All @@ -25,19 +25,19 @@ where temp_t.rn = 1
and datediff(day, op.observation_period_start_date, temp_t.cohort_start_date) >= @min_prior_observation;

-- all outcomes
drop table if exists #outcomes_all;
IF OBJECT_ID('tempdb..#outcomes_all', 'U') IS NOT NULL DROP TABLE #outcomes_all;
select * into #outcomes_all
from @outcome_database_schema.@outcome_table
where cohort_definition_id in
(@outcome_ids);

-- first outcomes in washout days and min prior obs
drop table if exists #outcomes_washout;
IF OBJECT_ID('tempdb..#outcomes_washout', 'U') IS NOT NULL DROP TABLE #outcomes_washout;
select o.* into #outcomes_washout
from (select *,
ISNULL(datediff(day, LAG(cohort_start_date) OVER(partition by subject_id, cohort_definition_id order by cohort_start_date asc), cohort_start_date ), 100000) as time_between
from #outcomes_all
) as o
) o
inner join @cdm_database_schema.observation_period op
on op.person_id = o.subject_id
and o.cohort_start_date >= op.observation_period_start_date
Expand All @@ -47,7 +47,7 @@ and datediff(day, op.observation_period_start_date, o.cohort_start_date) >= @min


-- 2) get all the people with the outcome during washout
drop table if exists #case_exclude;
IF OBJECT_ID('tempdb..#case_exclude', 'U') IS NOT NULL DROP TABLE #case_exclude;

-- people with outcome prior
select
Expand All @@ -68,7 +68,7 @@ o.cohort_start_date <= dateadd(day, -1, t.cohort_start_date);


---- Create TAR agnostic cohorts
drop table if exists #cases;
IF OBJECT_ID('tempdb..#cases', 'U') IS NOT NULL DROP TABLE #cases;
select * into #cases

from
Expand All @@ -89,4 +89,5 @@ and cd.cohort_type = 'Exclude' -- changed from TnOprior
) temp_ts2;

-- drop the table needed by the case series cohorts
drop table if exists #case_series;
IF OBJECT_ID('tempdb..#case_series', 'U') IS NOT NULL DROP TABLE #case_series;

3 changes: 1 addition & 2 deletions inst/sql/sql_server/CaseCohortsPart2.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
-- PER TAR RUN TO GET TnO cohorts

-- 1) get all the people with the outcome in TAR
drop table if exists #cases_tar;

IF OBJECT_ID('tempdb..#cases_tar', 'U') IS NOT NULL DROP TABLE #cases_tar;
-- cases
select
t.subject_id,
Expand Down
276 changes: 276 additions & 0 deletions tests/testthat/test-dbs.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
# specify databases to test

dbmsPlatforms <- c(
"bigquery",
"oracle",
"postgresql",
"redshift",
"snowflake",
"spark",
"sql server"
)

getPlatformConnectionDetails <- function(dbmsPlatform) {
# Get drivers for test platform
if (dir.exists(Sys.getenv("DATABASECONNECTOR_JAR_FOLDER"))) {
jdbcDriverFolder <- Sys.getenv("DATABASECONNECTOR_JAR_FOLDER")
} else {
jdbcDriverFolder <- "~/.jdbcDrivers"
dir.create(jdbcDriverFolder, showWarnings = FALSE)
}

options("sqlRenderTempEmulationSchema" = NULL)
if (dbmsPlatform == "sqlite") {
connectionDetails <- Eunomia::getEunomiaConnectionDetails()
cdmDatabaseSchema <- "main"
vocabularyDatabaseSchema <- "main"
cohortDatabaseSchema <- "main"
options("sqlRenderTempEmulationSchema" = NULL)
cohortTable <- "cohort"
} else {
if (dbmsPlatform == "bigquery") {
# To avoid rate limit on BigQuery, only test on 1 OS:
if (.Platform$OS.type == "windows") {
bqKeyFile <- tempfile(fileext = ".json")
writeLines(Sys.getenv("CDM_BIG_QUERY_KEY_FILE"), bqKeyFile)
if (testthat::is_testing()) {
withr::defer(unlink(bqKeyFile, force = TRUE), testthat::teardown_env())
}
bqConnectionString <- gsub(
"<keyfile path>",
normalizePath(bqKeyFile, winslash = "/"),
Sys.getenv("CDM_BIG_QUERY_CONNECTION_STRING")
)
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbmsPlatform,
user = "",
password = "",
connectionString = !!bqConnectionString,
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM_BIG_QUERY_CDM_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM_BIG_QUERY_CDM_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM_BIG_QUERY_OHDSI_SCHEMA")
options(sqlRenderTempEmulationSchema = Sys.getenv("CDM_BIG_QUERY_OHDSI_SCHEMA"))
} else {
return(NULL)
}
} else if (dbmsPlatform == "oracle") {
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbmsPlatform,
user = Sys.getenv("CDM5_ORACLE_USER"),
password = URLdecode(Sys.getenv("CDM5_ORACLE_PASSWORD")),
server = Sys.getenv("CDM5_ORACLE_SERVER"),
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM5_ORACLE_CDM_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM5_ORACLE_CDM_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM5_ORACLE_OHDSI_SCHEMA")
options(sqlRenderTempEmulationSchema = Sys.getenv("CDM5_ORACLE_OHDSI_SCHEMA"))
} else if (dbmsPlatform == "postgresql") {
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbmsPlatform,
user = Sys.getenv("CDM5_POSTGRESQL_USER"),
password = URLdecode(Sys.getenv("CDM5_POSTGRESQL_PASSWORD")),
server = Sys.getenv("CDM5_POSTGRESQL_SERVER"),
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM5_POSTGRESQL_CDM_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM5_POSTGRESQL_CDM_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM5_POSTGRESQL_OHDSI_SCHEMA")
} else if (dbmsPlatform == "redshift") {
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbmsPlatform,
user = Sys.getenv("CDM5_REDSHIFT_USER"),
password = URLdecode(Sys.getenv("CDM5_REDSHIFT_PASSWORD")),
server = Sys.getenv("CDM5_REDSHIFT_SERVER"),
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM5_REDSHIFT_CDM_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM5_REDSHIFT_CDM_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM5_REDSHIFT_OHDSI_SCHEMA")
} else if (dbmsPlatform == "snowflake") {
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbmsPlatform,
user = Sys.getenv("CDM_SNOWFLAKE_USER"),
password = URLdecode(Sys.getenv("CDM_SNOWFLAKE_PASSWORD")),
connectionString = Sys.getenv("CDM_SNOWFLAKE_CONNECTION_STRING"),
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM_SNOWFLAKE_CDM53_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM_SNOWFLAKE_CDM53_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM_SNOWFLAKE_OHDSI_SCHEMA")
options(sqlRenderTempEmulationSchema = Sys.getenv("CDM_SNOWFLAKE_OHDSI_SCHEMA"))
} else if (dbmsPlatform == "spark") {
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbmsPlatform,
user = Sys.getenv("CDM5_SPARK_USER"),
password = URLdecode(Sys.getenv("CDM5_SPARK_PASSWORD")),
connectionString = Sys.getenv("CDM5_SPARK_CONNECTION_STRING"),
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM5_SPARK_CDM_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM5_SPARK_CDM_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM5_SPARK_OHDSI_SCHEMA")
options(sqlRenderTempEmulationSchema = Sys.getenv("CDM5_SPARK_OHDSI_SCHEMA"))
} else if (dbmsPlatform == "sql server") {
connectionDetails <- createConnectionDetails(
dbms = dbmsPlatform,
user = Sys.getenv("CDM5_SQL_SERVER_USER"),
password = URLdecode(Sys.getenv("CDM5_SQL_SERVER_PASSWORD")),
server = Sys.getenv("CDM5_SQL_SERVER_SERVER"),
pathToDriver = jdbcDriverFolder
)
cdmDatabaseSchema <- Sys.getenv("CDM5_SQL_SERVER_CDM_SCHEMA")
vocabularyDatabaseSchema <- Sys.getenv("CDM5_SQL_SERVER_CDM_SCHEMA")
cohortDatabaseSchema <- Sys.getenv("CDM5_SQL_SERVER_OHDSI_SCHEMA")
}

# Add drivers
DatabaseConnector::downloadJdbcDrivers(dbmsPlatform, pathToDriver = jdbcDriverFolder)
# Table created to avoid collisions
cohortTable <- paste0("ct_", Sys.getpid(), format(Sys.time(), "%s"), sample(1:100, 1))
}

return(list(
dbmsPlatform = dbmsPlatform,
connectionDetails = connectionDetails,
cohortDatabaseSchema = cohortDatabaseSchema,
cohortTable = cohortTable,
cdmDatabaseSchema = cdmDatabaseSchema,
vocabularyDatabaseSchema = vocabularyDatabaseSchema
))
}

for(dbmsPlatform in dbmsPlatforms){
if(Sys.getenv('CI') == 'true'){
tempFolder <- tempfile(paste0("Characterization_", dbmsPlatform))
on.exit(unlink(tempFolder, recursive = TRUE), add = TRUE)

dbmsDetails <- getPlatformConnectionDetails(dbmsPlatform)
con <- DatabaseConnector::connect(dbmsDetails$connectionDetails)
on.exit(DatabaseConnector::disconnect(con))
}

# This file contains platform specific tests
test_that(paste0("platform specific test ", dbmsPlatform), {
skip_if(Sys.getenv('CI') != 'true', 'not run locally')
if (is.null(dbmsDetails)) {
print(paste("No platform details available for", dbmsPlatform))
} else {

# create a cohort table
DatabaseConnector::insertTable(
connection = con,
databaseSchema = dbmsDetails$cohortDatabaseSchema,
tableName = dbmsDetails$cohortTable,
data = data.frame(
subject_id = 1:10,
cohort_definition_id = sample(4, 10, replace = T),
cohort_start_date = rep('20100101', 10),
cohort_end_date = rep('20100101', 10)
))

targetIds <- c(1, 2, 4)
outcomeIds <- c(3)

timeToEventSettings1 <- createTimeToEventSettings(
targetIds = 1,
outcomeIds = c(3, 4)
)
timeToEventSettings2 <- createTimeToEventSettings(
targetIds = 2,
outcomeIds = c(3, 4)
)

dechallengeRechallengeSettings <- createDechallengeRechallengeSettings(
targetIds = targetIds,
outcomeIds = outcomeIds,
dechallengeStopInterval = 30,
dechallengeEvaluationWindow = 31
)

aggregateCovariateSettings1 <- createAggregateCovariateSettings(
targetIds = targetIds,
outcomeIds = outcomeIds,
riskWindowStart = 1,
startAnchor = "cohort start",
riskWindowEnd = 365,
endAnchor = "cohort start",
covariateSettings = FeatureExtraction::createCovariateSettings(
useDemographicsGender = T,
useDemographicsAge = T,
useDemographicsRace = T
)
)

aggregateCovariateSettings2 <- createAggregateCovariateSettings(
targetIds = targetIds,
outcomeIds = outcomeIds,
riskWindowStart = 1,
startAnchor = "cohort start",
riskWindowEnd = 365,
endAnchor = "cohort start",
covariateSettings = FeatureExtraction::createCovariateSettings(
useConditionOccurrenceLongTerm = T
)
)

characterizationSettings <- createCharacterizationSettings(
timeToEventSettings = list(
timeToEventSettings1,
timeToEventSettings2
),
dechallengeRechallengeSettings = list(
dechallengeRechallengeSettings
),
aggregateCovariateSettings = list(
aggregateCovariateSettings1,
aggregateCovariateSettings2
)
)

runCharacterizationAnalyses(
connectionDetails = dbmsDetails$connectionDetails,
cdmDatabaseSchema = dbmsDetails$cdmDatabaseSchema,
targetDatabaseSchema = dbmsDetails$cohortDatabaseSchema,
targetTable = dbmsDetails$cohortTable,
outcomeDatabaseSchema = dbmsDetails$cohortDatabaseSchema,
outcomeTable = dbmsDetails$cohortTable,
characterizationSettings = characterizationSettings,
outputDirectory = file.path(tempFolder, 'csv'),
executionPath = file.path(tempFolder, 'execution'),
csvFilePrefix = "c_",
threads = 1,
databaseId = dbmsDetails$connectionDetails$dbms
)

testthat::expect_true(
length(dir(file.path(tempFolder, "csv"))) > 0
)

# check cohort details is saved
testthat::expect_true(
file.exists(file.path(tempFolder, "csv", "c_cohort_details.csv"))
)
testthat::expect_true(
file.exists(file.path(tempFolder, "csv", "c_settings.csv"))
)
testthat::expect_true(
file.exists(file.path(tempFolder, "csv", "c_analysis_ref.csv"))
)
testthat::expect_true(
file.exists(file.path(tempFolder, "csv", "c_covariate_ref.csv"))
)
testthat::expect_true(
file.exists(file.path(tempFolder, "csv", "c_dechallenge_rechallenge.csv"))
)
#testthat::expect_true(
# file.exists(file.path(tempFolder, "csv", "rechallenge_fail_case_series.csv"))
#)
testthat::expect_true(
file.exists(file.path(tempFolder, "csv", "c_time_to_event.csv"))
)
}
})
}

0 comments on commit fa8ccd3

Please sign in to comment.