-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsprint-updates-bot.py
357 lines (310 loc) · 14.8 KB
/
sprint-updates-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import os
import requests
import datetime
import logging
import re
import argparse
import ssl
import certifi
import time
from dotenv import load_dotenv
from openai import OpenAI # Assumed custom OpenAI client wrapper
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
# Load environment variables (do not clear the entire os.environ)
load_dotenv()
def get_env_variable(var_name):
"""
Load and validate an environment variable.
"""
value = os.getenv(var_name)
if not value:
logging.error(f"Environment variable {var_name} is not set or is empty.")
raise EnvironmentError(f"Missing required environment variable: {var_name}")
return value
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Constants
GITHUB_API_URL = "https://api.github.com"
GH_TOKEN = get_env_variable("GH_TOKEN")
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
ASSISTANT_ID = get_env_variable("ASSISTANT_ID")
SLACK_TOKEN = get_env_variable("SLACK_TOKEN")
SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "defactor-internal")
# Initialize OpenAI and Slack clients
client = OpenAI(api_key=OPENAI_API_KEY)
slack_client = WebClient(token=SLACK_TOKEN, ssl=ssl.create_default_context(cafile=certifi.where()))
# List of repositories to scan
REPOS = [
{'owner': 'defactor-com', 'name': 'assets-webapp'},
{'owner': 'defactor-com', 'name': 'assets-backend'},
{'owner': 'defactor-com', 'name': 'pools-webapp'},
{'owner': 'defactor-com', 'name': 'pools-backend'},
{'owner': 'defactor-com', 'name': 'engage-webapp'},
{'owner': 'defactor-com', 'name': 'engage-backend'},
{'owner': 'defactor-com', 'name': 'ui-kit'},
{'owner': 'defactor-com', 'name': 'sdk'},
{'owner': 'defactor-com', 'name': 'ipfs'},
{'owner': 'defactor-com', 'name': 'documentation'}
]
# Helper functions
def get_sprint_info(target_date=None):
"""
Calculate sprint information based on a target date.
If no date is provided, uses today's date.
Returns sprint start date, end date, and sprint number.
"""
SPRINT_29_START = datetime.date(2025, 1, 27) # Reference sprint start date
SPRINT_29_NUMBER = 29 # Reference sprint number
target_date = target_date or datetime.date.today()
logging.info(f"Calculating sprint info for date: {target_date.strftime('%B %d, %Y')}")
# Calculate days since the reference sprint
days_since_ref = (target_date - SPRINT_29_START).days
# Calculate which sprint this date belongs to
current_sprint = days_since_ref // 14
# Calculate sprint number and dates
sprint_number = SPRINT_29_NUMBER + current_sprint
sprint_start = SPRINT_29_START + datetime.timedelta(days=14 * current_sprint)
sprint_end = sprint_start + datetime.timedelta(days=13)
is_sprint_complete = target_date > sprint_end
is_sprint_start_day = target_date.weekday() == 0 and target_date == sprint_start
logging.info(f"Sprint {sprint_number}: {sprint_start.strftime('%B %d')} - {sprint_end.strftime('%B %d, %Y')}")
logging.info(f"Sprint status: {'Complete' if is_sprint_complete else 'In Progress'}")
return sprint_start, sprint_end, sprint_number, is_sprint_complete, is_sprint_start_day
def get_previous_sprint_dates():
"""
Returns the start and end dates for the previous completed sprint,
along with the sprint number.
"""
today = datetime.date.today()
current_sprint_start, current_sprint_end, current_sprint_number, is_complete, is_start_day = get_sprint_info(today)
if is_start_day:
# On sprint start day (Monday), return the previous sprint
previous_sprint_start = current_sprint_start - datetime.timedelta(days=14)
previous_sprint_end = current_sprint_start - datetime.timedelta(days=1)
previous_sprint_number = current_sprint_number - 1
else:
# On any other day, return the current sprint
previous_sprint_start = current_sprint_start
previous_sprint_end = current_sprint_end
previous_sprint_number = current_sprint_number
logging.info(f"Selected Sprint {previous_sprint_number}")
logging.info(f"Sprint start date: {previous_sprint_start.strftime('%B %d, %Y')}")
logging.info(f"Sprint end date: {previous_sprint_end.strftime('%B %d, %Y')}")
return previous_sprint_start, previous_sprint_end, previous_sprint_number
def github_api_request(url, params=None):
"""
Make a GitHub API request and return JSON response.
"""
headers = {"Authorization": f"token {GH_TOKEN}"}
response = requests.get(url, headers=headers, params=params)
logging.debug(f"GitHub API request URL: {response.url}")
response.raise_for_status()
return response.json()
def get_pull_requests(owner: str, repo: str, start_date: datetime.date, end_date: datetime.date) -> list:
pulls = []
page = 1
while True:
pulls_url = f"{GITHUB_API_URL}/repos/{owner}/{repo}/pulls"
params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100, "page": page}
page_data = github_api_request(pulls_url, params)
if not page_data:
break
pulls.extend(page_data)
# If fewer than 100 results, then it's the last page.
if len(page_data) < 100:
break
page += 1
relevant_pulls = [
pr for pr in pulls
if start_date <= datetime.datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ").date() <= end_date
]
return relevant_pulls
def get_issue_details(owner, repo, issue_number):
"""
Retrieve issue details by issue number.
"""
issue_url = f"{GITHUB_API_URL}/repos/{owner}/{repo}/issues/{issue_number}"
return github_api_request(issue_url)
def extract_issue_numbers(pr_body):
"""
Extract issue numbers referenced in the PR body.
"""
if not pr_body:
return []
issue_numbers = re.findall(
r'(?:fixes|closes|resolves|references|refs|fix|close|resolve)[^\n\r#]*(?:#|GH-)(\d+)',
pr_body, re.IGNORECASE
)
logging.debug(f"Extracted issue references: {issue_numbers}")
return list(map(int, issue_numbers))
def clean_pr_body(pr_body):
"""
Remove unwanted sections from the PR description.
"""
if not pr_body:
return "No description provided."
cleaned = re.sub(r'### Steps to test.*?(\n\n|$)', '', pr_body, flags=re.DOTALL)
cleaned = re.sub(r'#### CheckList.*?(\n\n|$)', '', cleaned, flags=re.DOTALL)
cleaned = re.sub(r'\d+\.\s.*\n', '', cleaned, flags=re.MULTILINE)
cleaned = re.sub(r'- \[[ x]\].*\n', '', cleaned, flags=re.MULTILINE)
return cleaned.strip()
def generate_summary(repos, start_date, end_date, sprint_number):
"""
Generate a markdown summary of the sprint updates.
"""
summary = f"# Development Sprint Updates (Sprint {sprint_number}: {start_date.strftime('%B %d')} - {end_date.strftime('%B %d, %Y')})\n\n"
for repo in repos:
owner = repo['owner']
name = repo['name']
logging.info(f"Processing repository: {owner}/{name}")
summary += f"## Repository: {owner}/{name}\n\n"
pulls = get_pull_requests(owner, name, start_date, end_date)
if not pulls:
summary += "• No activity in this repository during this week.\n\n"
continue
# Track which issues have been added to avoid duplicates
issues_seen = set()
for pr in pulls:
pr_body = pr.get('body', 'No description provided.')
cleaned_body = clean_pr_body(pr_body)
# Instead of including PR numbers, we only show the title and description.
summary += f"• *{pr['title']}*\n"
summary += f" - Description: {cleaned_body}\n"
issue_numbers = extract_issue_numbers(pr_body)
if issue_numbers:
summary += " - Referenced Issues:\n"
for issue_number in issue_numbers:
if issue_number not in issues_seen:
try:
issue_details = get_issue_details(owner, name, issue_number)
summary += f" • {issue_details['title']} – {issue_details.get('body', 'No details provided.')}\n"
issues_seen.add(issue_number)
except requests.exceptions.HTTPError as e:
logging.error(f"Failed to fetch details for issue #{issue_number} in {owner}/{name}: {e}")
else:
summary += " - No issues referenced.\n"
summary += "\n"
return summary
def save_summary_to_file(summary, filename):
"""
Save the summary text to a file.
"""
with open(filename, "w") as file:
file.write(summary)
logging.info(f"Summary saved to {filename}")
def format_for_slack(text):
"""
Convert markdown formatting to Slack-friendly formatting.
- Headers become bold.
- Bullet points remain single-level.
"""
# Convert headers (remove extra bullets if any)
text = re.sub(r'^# (.*?)$', r'*\1*', text, flags=re.MULTILINE) # h1 becomes bold
text = re.sub(r'^## (.*?)$', r'*\1*', text, flags=re.MULTILINE) # h2 becomes bold
text = re.sub(r'^• \*', '• *', text, flags=re.MULTILINE) # ensure only one bullet level
# Replace markdown bold with Slack bold
text = re.sub(r'\*\*(.*?)\*\*', r'*\1*', text)
# Normalize list spacing
text = re.sub(r'\n\n+', '\n\n', text)
return text.strip()
def get_polished_summary(summary: str, sprint_number: int) -> str:
"""
Use the assistant to reformat the summary for Slack.
If the assistant fails, return the formatted summary.
"""
thread = client.beta.threads.create()
assistant_prompt = f"""
Please reformat the following development update for Sprint {sprint_number} into a concise, well-structured Slack message for our marketing team.
Organize the updates by repository (each repository under its own header).
Use bullet points for key updates but do not nest them (avoid double bullet points).
Remove any references to PR numbers and issue numbers.
Rewrite each bullet point's title to be more human-friendly based on our commit message guidelines.
This update will be published on the defactor Technology & Innovation section of our internal site: https://inside.defactor.com.
{summary}
"""
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=assistant_prompt
)
run = client.beta.threads.runs.create(thread_id=thread.id, assistant_id=ASSISTANT_ID)
timeout = 60 # maximum seconds to wait (tweak as needed)
start_time = time.time()
while True:
if time.time() - start_time > timeout:
logging.error("Assistant run timed out!")
return format_for_slack(summary)
run_status = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
if run_status.status == 'completed':
break
elif run_status.status in ['failed', 'cancelled', 'expired']:
logging.error(f"Assistant run failed with status: {run_status.status}")
return format_for_slack(summary)
time.sleep(1)
messages = client.beta.threads.messages.list(thread_id=thread.id)
for message in messages:
if message.role == "assistant":
return format_for_slack(message.content[0].text.value)
return format_for_slack(summary)
def post_to_slack(channel, message):
"""
Post the provided message to Slack, splitting it if necessary.
"""
try:
max_length = 40000 # Slack's maximum message length
parts = [message[i:i+max_length] for i in range(0, len(message), max_length)]
for part in parts:
response = slack_client.chat_postMessage(channel=channel, text=part, parse='full')
logging.info(f"Message posted to {channel}: {response['ts']}")
except SlackApiError as e:
logging.error(f"Error posting message to Slack: {e.response['error']}")
def main(repos, start_date=None, end_date=None, sprint_number=None):
"""
Main workflow:
1. Generate a summary of updates.
2. Save the raw summary.
3. Use the assistant to polish the summary.
4. Save the polished summary.
5. Post the polished summary to Slack.
"""
if start_date and end_date:
# Manual date range provided
if not sprint_number:
# Calculate sprint number based on the start date
_, _, sprint_number, _, _ = get_sprint_info(start_date)
logging.info(f"Using provided dates: {start_date.strftime('%B %d, %Y')} to {end_date.strftime('%B %d, %Y')}")
logging.info(f"Using sprint number: {sprint_number}")
else:
# Automatic date calculation
start_date, end_date, sprint_number = get_previous_sprint_dates()
logging.info(f"Generating summary for Sprint {sprint_number}")
if datetime.date.today() <= end_date:
logging.warning(f"Generating report for incomplete Sprint {sprint_number}")
summary = generate_summary(repos, start_date, end_date, sprint_number)
raw_filename = f"Development_Sprint_{sprint_number}_Updates_{start_date}_to_{end_date}_raw.txt"
polished_filename = f"Development_Sprint_{sprint_number}_Updates_{start_date}_to_{end_date}_polished.txt"
logging.info(f"Saving raw summary to: {raw_filename}")
save_summary_to_file(summary, raw_filename)
logging.info(f"Getting polished summary from assistant for Sprint {sprint_number}")
polished_summary = get_polished_summary(summary, sprint_number)
logging.info(f"Saving polished summary to: {polished_filename}")
save_summary_to_file(polished_summary, polished_filename)
logging.info(f"Posting Sprint {sprint_number} update to Slack channel: {SLACK_CHANNEL}")
post_to_slack(SLACK_CHANNEL, polished_summary)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Generate sprint update report from GitHub.')
parser.add_argument('--start_date', type=str, help='Start date in YYYY-MM-DD format')
parser.add_argument('--end_date', type=str, help='End date in YYYY-MM-DD format')
parser.add_argument('--sprint_number', type=int, help='Optional: Override sprint number')
args = parser.parse_args()
try:
if (args.start_date and not args.end_date) or (args.end_date and not args.start_date):
raise ValueError("Both start_date and end_date must be provided together.")
start_date = datetime.datetime.strptime(args.start_date, '%Y-%m-%d').date() if args.start_date else None
end_date = datetime.datetime.strptime(args.end_date, '%Y-%m-%d').date() if args.end_date else None
sprint_number = args.sprint_number
main(REPOS, start_date, end_date, sprint_number)
except Exception as e:
logging.error(f"Error running script: {str(e)}")
raise