-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
119 lines (101 loc) · 4.04 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import boto3
import streamlit as st
from typing import Sequence
from streamlit_tree_select import tree_select
from streamlit_ace import st_ace, LANGUAGES
from dotenv import load_dotenv
from difflib import get_close_matches
import os
load_dotenv()
st.set_page_config(layout='wide')
if 'client' not in st.session_state:
st.session_state.client = None
if 'traverse' not in st.session_state:
st.session_state.traverse = False
def get_folder(repository: str, folder: str) -> dict:
return st.session_state.client.get_folder(repositoryName=repository, folderPath=folder)
def get_file(repository: str, filepath: str) -> dict:
return st.session_state.client.get_file(repositoryName=repository, filePath=filepath)
def add_files(response: dict) -> None:
return [
{'label': f['relativePath'], 'value': f['absolutePath']}
for f in response['files']
]
def traverse_folder(repository: str, absolute_path: str) -> Sequence:
response = get_folder(repository, absolute_path)
files = add_files(response)
children = []
for folder in response['subFolders']:
response = get_folder(repository, folder['absolutePath'])
children.append(
{
'label': folder['relativePath'],
'value': folder['absolutePath'],
'children': traverse_folder(repository, folder['absolutePath'])
}
)
children.extend(files)
return children
@st.experimental_memo
def traverse_repo(repository: str) -> Sequence:
return [
{
'label': repository,
'value': repository,
'children': traverse_folder(repository, '/')
}
]
def put_search_interface(nodes: Sequence) -> dict:
with st.sidebar:
response = tree_select(nodes, only_leaf_checkboxes=True, check_model='leaf')
return response
@st.experimental_memo
def list_repositories() -> Sequence:
response = st.session_state.client.list_repositories(sortBy='repositoryName', order='ascending')
repos = response['repositories']
while 'nextToken' in response:
response = st.session_state.client.list_repositories(nextToken=response['nextToken'], sortBy='repositoryName', order='ascending')
repos.extend(response['repositories'])
return repos
def put_form() -> str:
with st.sidebar.form('Input'):
default_profile, default_region = os.environ.get('AWS_PROFILE'), os.environ.get('AWS_REGION')
if default_profile and default_region:
profile_name, region_name = default_profile, default_region
else:
profile_name = st.text_input(label='AWS Profile')
region_name = st.text_input(label='AWS Region')
boto3_session = boto3.Session(profile_name=profile_name, region_name=region_name)
st.session_state.client = boto3_session.client('codecommit')
repos = list_repositories()
selection = st.selectbox(
label='CodeCommit repo',
options=repos,
format_func=lambda x: x['repositoryName']
)
if st.form_submit_button('Explore') or st.session_state.traverse:
st.session_state.traverse = True
return selection['repositoryName']
def main():
repository = put_form()
if repository is not None:
with st.sidebar:
nodes = traverse_repo(repository)
response = put_search_interface(nodes)
if not response.get('checked'):
st.stop()
elif len(response['checked']) > 1:
st.caption('⚠️ Please select one file at a time.')
else:
filename = response['checked'].pop()
selected_file = get_file(repository, filename)
content = selected_file['fileContent'].decode('utf-8')
languages = get_close_matches(filename.split('.')[-1], LANGUAGES, cutoff=0.5)
st_ace(
value=content,
readonly=True,
language=languages[0] if languages else 'text',
theme='tomorrow_night_eighties',
)
if __name__ == '__main__':
main()