-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathrotate_keys.py
174 lines (132 loc) · 5.89 KB
/
rotate_keys.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import boto3
import requests
import sys
import os
from base64 import b64encode
from nacl import encoding, public
# checks if values set to override default or set default
access_key_name = os.environ.get('GITHUB_ACCESS_KEY_NAME', 'access_key_id')
secret_key_name = os.environ.get('GITHUB_SECRET_KEY_NAME', 'secret_key_id')
# sets creds for boto3
iam = boto3.client(
'iam',
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
aws_session_token=os.environ.get('AWS_SESSION_TOKEN')
)
def main_function():
iam_username = os.environ.get('IAM_USERNAME', who_am_i())
github_token = os.environ['PERSONAL_ACCESS_TOKEN']
owner_repository = os.environ['OWNER_REPOSITORY']
list_ret = iam.list_access_keys(UserName=iam_username)
starting_num_keys = len(list_ret["AccessKeyMetadata"])
# Check if two keys already exist, if so, exit 1
if starting_num_keys >= 2:
print("There are already 2 keys for this user. Cannot rotate tokens.")
sys.exit(1)
else:
print(f"Validated <2 keys exist (current count: {starting_num_keys}), proceeding.")
# generate new credentials
(new_access_key, new_secret_key) = create_new_keys(iam_username)
for repos in [x.strip() for x in owner_repository.split(',')]:
# get repo pub key info
(public_key, pub_key_id) = get_pub_key(repos, github_token)
# encrypt the secrets
encrypted_access_key = encrypt(public_key, new_access_key)
encrypted_secret_key = encrypt(public_key, new_secret_key)
# upload secrets
upload_secret(repos, access_key_name, encrypted_access_key, pub_key_id, github_token)
upload_secret(repos, secret_key_name, encrypted_secret_key, pub_key_id, github_token)
# delete old keys
if starting_num_keys == 1:
delete_old_keys(iam_username, list_ret["AccessKeyMetadata"][0]["AccessKeyId"])
sys.exit(0)
def who_am_i():
# ask the aws backend for myself with a boto3 sts client
sts = boto3.client(
'sts',
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
aws_session_token=os.environ.get('AWS_SESSION_TOKEN')
)
user = sts.get_caller_identity()
# return last element of splitted list to get username
return user['Arn'].split("/")[-1]
def create_new_keys(iam_username):
# create the keys
create_ret = iam.create_access_key(
UserName=iam_username
)
new_access_key = create_ret['AccessKey']['AccessKeyId']
new_secret_key = create_ret['AccessKey']['SecretAccessKey']
# check to see if the keys were created
second_list_ret = iam.list_access_keys(UserName=iam_username)
access_keys = [k['AccessKeyId'] for k in second_list_ret["AccessKeyMetadata"]]
if new_access_key not in access_keys:
print("new keys failed to generate.")
sys.exit(1)
else:
print("new keys generated, proceeding")
return (new_access_key, new_secret_key)
def delete_old_keys(iam_username, current_access_id):
delete_ret = iam.delete_access_key(
UserName=iam_username,
AccessKeyId=current_access_id
)
if delete_ret['ResponseMetadata']['HTTPStatusCode'] != 200:
print("deletion of original key failed")
sys.exit(1)
# Update Actions Secret
# https://developer.github.com/v3/actions/secrets/#create-or-update-a-secret-for-a-repository
def encrypt(public_key: str, secret_value: str) -> str:
"""Encrypt a Unicode string using the public key."""
public_key = public.PublicKey(public_key.encode("utf-8"), encoding.Base64Encoder())
sealed_box = public.SealedBox(public_key)
encrypted = sealed_box.encrypt(secret_value.encode("utf-8"))
return b64encode(encrypted).decode("utf-8")
def get_pub_key(owner_repo, github_token):
# get public key for encrypting
endpoint = f'https://api.github.com/repos/{owner_repo}/actions/secrets/public-key'
if environment_name := os.environ.get('GITHUB_ENVIRONMENT'):
endpoint = f'https://api.github.com/repos/{owner_repo}/environments/{environment_name}/secrets/public-key'
pub_key_ret = requests.get(
endpoint,
headers={'Authorization': f"token {github_token}"}
)
if not pub_key_ret.status_code == requests.codes.ok:
raise Exception(f"github public key request failed, \
status code: {pub_key_ret.status_code}, \
body: {pub_key_ret.text}, \
vars: {owner_repo} {github_token} \
")
sys.exit(1)
# convert to json
public_key_info = pub_key_ret.json()
# extract values
public_key = public_key_info['key']
public_key_id = public_key_info['key_id']
return (public_key, public_key_id)
def upload_secret(owner_repo, key_name, encrypted_value, pub_key_id, github_token):
# upload encrypted access key
endpoint = f'https://api.github.com/repos/{owner_repo}/actions/secrets/{key_name}'
if environment_name := os.environ.get('GITHUB_ENVIRONMENT'):
endpoint = f'https://api.github.com/repos/{owner_repo}/environments/{environment_name}/secrets/{key_name}'
updated_secret = requests.put(
endpoint,
json={
'encrypted_value': encrypted_value,
'key_id': pub_key_id
},
headers={'Authorization': f"token {github_token}"}
)
# status codes github says are valid
good_status_codes = [204, 201]
if updated_secret.status_code not in good_status_codes:
print(f'Got status code: {updated_secret.status_code} on updating {key_name} in {owner_repo}')
sys.exit(1)
if environment_name := os.environ.get('GITHUB_ENVIRONMENT'):
print(f'Updated: {key_name} in {owner_repo} for {environment_name}')
else:
print(f'Updated: {key_name} in {owner_repo}')
# run everything
main_function()