Skip to content

Commit 3dea2d9

Browse files
authored
Merge pull request #56 from neurostuff/enh/use_sdks
[ENH] use neurostore/neurosynth compose sdks
2 parents da0edf0 + 4d40785 commit 3dea2d9

20 files changed

Lines changed: 189223 additions & 342150 deletions

compose_runner/aws_lambda/common.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ def respond(self, body: Dict[str, Any], status_code: int = 200) -> Dict[str, Any
5353
return body
5454

5555
def bad_request(self, message: str, status_code: int = 400) -> Dict[str, Any]:
56-
return self.respond({"status": "FAILED", "error": message}, status_code=status_code)
56+
return self.respond(
57+
{"status": "FAILED", "error": message}, status_code=status_code
58+
)
5759

5860
def get(self, key: str, default: Any = None) -> Any:
5961
return self.payload.get(key, default)
60-

compose_runner/aws_lambda/cost_check_handler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ def _current_month_cost() -> Dict[str, Any]:
3232
Metrics=["UnblendedCost"],
3333
)
3434
results = response.get("ResultsByTime", [])
35-
total = results[0]["Total"]["UnblendedCost"] if results else {"Amount": "0", "Unit": "USD"}
35+
total = (
36+
results[0]["Total"]["UnblendedCost"]
37+
if results
38+
else {"Amount": "0", "Unit": "USD"}
39+
)
3640
amount = float(Decimal(total.get("Amount", "0")))
3741
currency = total.get("Unit", "USD")
3842
return {"amount": amount, "currency": currency, "time_period": period}

compose_runner/aws_lambda/log_poll_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
from compose_runner.aws_lambda.common import LambdaRequest
1010

11-
_LOGS_CLIENT = boto3.client("logs", region_name=os.environ.get("AWS_REGION", "us-east-1"))
11+
_LOGS_CLIENT = boto3.client(
12+
"logs", region_name=os.environ.get("AWS_REGION", "us-east-1")
13+
)
1214

1315
LOG_GROUP_ENV = "RUNNER_LOG_GROUP"
1416
DEFAULT_LOOKBACK_MS_ENV = "DEFAULT_LOOKBACK_MS"

compose_runner/aws_lambda/results_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
3535
raise KeyError(message)
3636
expires_in = int(payload.get("expires_in", DEFAULT_EXPIRES_IN))
3737

38-
key_prefix = f"{prefix.rstrip('/')}/{artifact_prefix}" if prefix else artifact_prefix
38+
key_prefix = (
39+
f"{prefix.rstrip('/')}/{artifact_prefix}" if prefix else artifact_prefix
40+
)
3941

4042
response = _S3.list_objects_v2(Bucket=bucket, Prefix=key_prefix)
4143
contents = response.get("Contents", [])

compose_runner/aws_lambda/run_handler.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
logger = logging.getLogger(__name__)
1717
logger.setLevel(logging.INFO)
1818

19-
_SFN_CLIENT = boto3.client("stepfunctions", region_name=os.environ.get("AWS_REGION", "us-east-1"))
19+
_SFN_CLIENT = boto3.client(
20+
"stepfunctions", region_name=os.environ.get("AWS_REGION", "us-east-1")
21+
)
2022

2123
STATE_MACHINE_ARN_ENV = "STATE_MACHINE_ARN"
2224
RESULTS_BUCKET_ENV = "RESULTS_BUCKET"
@@ -44,10 +46,14 @@ def _compose_api_base_url(environment: str) -> str:
4446
return "https://compose.neurosynth.org/api"
4547

4648

47-
def _fetch_meta_analysis(meta_analysis_id: str, environment: str) -> Optional[Dict[str, Any]]:
49+
def _fetch_meta_analysis(
50+
meta_analysis_id: str, environment: str
51+
) -> Optional[Dict[str, Any]]:
4852
base_url = _compose_api_base_url(environment).rstrip("/")
4953
url = f"{base_url}/meta-analyses/{meta_analysis_id}?nested=true"
50-
request = urllib.request.Request(url, headers={"User-Agent": "compose-runner/submit"})
54+
request = urllib.request.Request(
55+
url, headers={"User-Agent": "compose-runner/submit"}
56+
)
5157
try:
5258
with urllib.request.urlopen(request, timeout=10) as response:
5359
return json.load(response)
@@ -77,7 +83,9 @@ def _requires_large_task(specification: Dict[str, Any]) -> bool:
7783
return False
7884

7985

80-
def _select_task_size(meta_analysis_id: str, environment: str, artifact_prefix: str) -> str:
86+
def _select_task_size(
87+
meta_analysis_id: str, environment: str, artifact_prefix: str
88+
) -> str:
8189
doc = _fetch_meta_analysis(meta_analysis_id, environment)
8290
if not doc:
8391
return DEFAULT_TASK_SIZE
@@ -92,7 +100,9 @@ def _select_task_size(meta_analysis_id: str, environment: str, artifact_prefix:
92100
)
93101
return "large"
94102
except Exception as exc: # noqa: broad-except
95-
logger.warning("Failed to evaluate specification for %s: %s", meta_analysis_id, exc)
103+
logger.warning(
104+
"Failed to evaluate specification for %s: %s", meta_analysis_id, exc
105+
)
96106
return DEFAULT_TASK_SIZE
97107

98108

@@ -146,9 +156,13 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
146156
nv_key = payload.get("nv_key") or os.environ.get(NV_KEY_ENV)
147157

148158
environment = payload.get("environment", "production")
149-
task_size = _select_task_size(payload["meta_analysis_id"], environment, artifact_prefix)
159+
task_size = _select_task_size(
160+
payload["meta_analysis_id"], environment, artifact_prefix
161+
)
150162

151-
job_input = _job_input(payload, artifact_prefix, bucket, prefix, nsc_key, nv_key, task_size)
163+
job_input = _job_input(
164+
payload, artifact_prefix, bucket, prefix, nsc_key, nv_key, task_size
165+
)
152166
params = {
153167
"stateMachineArn": os.environ[STATE_MACHINE_ARN_ENV],
154168
"name": artifact_prefix,

compose_runner/aws_lambda/status_handler.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
from compose_runner.aws_lambda.common import LambdaRequest
1212

13-
_SFN = boto3.client("stepfunctions", region_name=os.environ.get("AWS_REGION", "us-east-1"))
13+
_SFN = boto3.client(
14+
"stepfunctions", region_name=os.environ.get("AWS_REGION", "us-east-1")
15+
)
1416
_S3 = boto3.client("s3", region_name=os.environ.get("AWS_REGION", "us-east-1"))
1517

1618
RESULTS_BUCKET_ENV = "RESULTS_BUCKET"
@@ -28,7 +30,9 @@ def _metadata_key(prefix: Optional[str], artifact_prefix: str) -> str:
2830
return f"{artifact_prefix}/{METADATA_FILENAME}"
2931

3032

31-
def _load_metadata(bucket: str, prefix: Optional[str], artifact_prefix: str) -> Optional[Dict[str, Any]]:
33+
def _load_metadata(
34+
bucket: str, prefix: Optional[str], artifact_prefix: str
35+
) -> Optional[Dict[str, Any]]:
3236
key = _metadata_key(prefix, artifact_prefix)
3337
try:
3438
response = _S3.get_object(Bucket=bucket, Key=key)
@@ -65,7 +69,11 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
6569
except ClientError as error:
6670
body = {"status": "FAILED", "error": error.response["Error"]["Message"]}
6771
if request.is_http:
68-
status_code = 404 if error.response["Error"]["Code"] == "ExecutionDoesNotExist" else 500
72+
status_code = (
73+
404
74+
if error.response["Error"]["Code"] == "ExecutionDoesNotExist"
75+
else 500
76+
)
6977
return request.respond(body, status_code=status_code)
7078
raise
7179

@@ -83,7 +91,9 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
8391

8492
artifact_prefix = description.get("name")
8593
if not artifact_prefix:
86-
raise ValueError("Execution does not expose a name; cannot determine artifact prefix.")
94+
raise ValueError(
95+
"Execution does not expose a name; cannot determine artifact prefix."
96+
)
8797
body["artifact_prefix"] = artifact_prefix
8898

8999
if status in {"SUCCEEDED", "FAILED"}:

compose_runner/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,7 @@ def cli(meta_analysis_id, environment, result_dir, nsc_key, nv_key, no_upload, n
2222
2323
META_ANALYSIS_ID is the id of the meta-analysis on neurosynth-compose.
2424
"""
25-
url, _ = run(meta_analysis_id, environment, result_dir, nsc_key, nv_key, no_upload, n_cores)
25+
url, _ = run(
26+
meta_analysis_id, environment, result_dir, nsc_key, nv_key, no_upload, n_cores
27+
)
2628
print(url)

compose_runner/ecs_task.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,23 @@ def _iter_result_files(result_dir: Path) -> Iterable[Path]:
4848
yield path
4949

5050

51-
def _upload_results(artifact_prefix: str, result_dir: Path, bucket: str, prefix: Optional[str]) -> None:
52-
base_prefix = f"{prefix.rstrip('/')}/{artifact_prefix}" if prefix else artifact_prefix
51+
def _upload_results(
52+
artifact_prefix: str, result_dir: Path, bucket: str, prefix: Optional[str]
53+
) -> None:
54+
base_prefix = (
55+
f"{prefix.rstrip('/')}/{artifact_prefix}" if prefix else artifact_prefix
56+
)
5357
for file_path in _iter_result_files(result_dir):
5458
key = f"{base_prefix}/{file_path.name}"
5559
_S3_CLIENT.upload_file(str(file_path), bucket, key)
5660

5761

58-
def _write_metadata(bucket: str, prefix: Optional[str], artifact_prefix: str, metadata: Dict[str, Any]) -> None:
59-
base_prefix = f"{prefix.rstrip('/')}/{artifact_prefix}" if prefix else artifact_prefix
62+
def _write_metadata(
63+
bucket: str, prefix: Optional[str], artifact_prefix: str, metadata: Dict[str, Any]
64+
) -> None:
65+
base_prefix = (
66+
f"{prefix.rstrip('/')}/{artifact_prefix}" if prefix else artifact_prefix
67+
)
6068
key = f"{base_prefix}/{METADATA_FILENAME}"
6169
metadata["metadata_key"] = key
6270
_S3_CLIENT.put_object(

0 commit comments

Comments
 (0)