11from datetime import datetime , timezone
22from enum import StrEnum
3- from datetime import datetime
43from functools import cache
54import logging
65import os
76from typing import overload
87from typing_extensions import Literal
8+ import re
99
1010from bs4 import BeautifulSoup , Tag # type: ignore
1111from pydantic import BaseModel
@@ -29,11 +29,12 @@ def ET_verify() -> bool | str:
2929 return True
3030
3131
32- def ET_api_get (path : str ):
32+ def ET_api_get (path : str , * , params : dict | None = None ):
3333 response = requests_session ().get (
3434 f"{ ET_URL } /api/v1/{ path } " ,
3535 auth = HTTPSPNEGOAuth (opportunistic_auth = True ),
3636 verify = ET_verify (),
37+ params = params ,
3738 )
3839 response .raise_for_status ()
3940 return response .json ()
@@ -60,6 +61,12 @@ def ET_get_html(path: str):
6061 return response .text
6162
6263
64+ def get_utc_timestamp_from_str (timestamp_string : str ):
65+ return datetime .strptime (timestamp_string , "%Y-%m-%dT%H:%M:%SZ" ).replace (
66+ tzinfo = timezone .utc
67+ )
68+
69+
6370@overload
6471def get_erratum (erratum_id : str | int , full : Literal [False ] = False ) -> Erratum : ...
6572
@@ -89,9 +96,14 @@ def get_erratum(erratum_id: str | int, full: bool = False) -> Erratum | FullErra
8996 for jira_issue_data in jira_issues
9097 )
9198
92- last_status_transition_timestamp = datetime .strptime (
93- details ["status_updated_at" ], "%Y-%m-%dT%H:%M:%SZ"
94- ).replace (tzinfo = timezone .utc )
99+ last_status_transition_timestamp = get_utc_timestamp_from_str (
100+ details ["status_updated_at" ]
101+ )
102+ publish_date = (
103+ get_utc_timestamp_from_str (details ["publish_date" ])
104+ if details ["publish_date" ] is not None
105+ else None
106+ )
95107
96108 base_erratum = Erratum (
97109 id = details ["id" ],
@@ -100,6 +112,8 @@ def get_erratum(erratum_id: str | int, full: bool = False) -> Erratum | FullErra
100112 synopsis = details ["synopsis" ],
101113 status = ErrataStatus (details ["status" ]),
102114 all_issues_release_pending = all_issues_release_pending ,
115+ group_id = details ["group_id" ],
116+ publish_date = publish_date ,
103117 last_status_transition_timestamp = last_status_transition_timestamp ,
104118 )
105119
@@ -145,6 +159,146 @@ def get_erratum_for_link(link: str, full: bool = True) -> Erratum | FullErratum:
145159 return get_erratum (erratum_id , full = full )
146160
147161
162+ class RHELStream (StrEnum ):
163+ Z = "Z"
164+ GA = "GA"
165+
166+
167+ class RHELVersion (BaseModel ):
168+ major_version : int
169+ minor_version : int
170+ stream : RHELStream
171+
172+ def __str__ (self ):
173+ match self .major_version :
174+ case 10 :
175+ return f"RHEL-{ self .major_version } .{ self .minor_version } .{ self .stream } "
176+ case 9 | 8 :
177+ if self .stream == RHELStream .Z :
178+ if self .minor_version % 2 == 1 :
179+ return (
180+ f"RHEL-{ self .major_version } .{ self .minor_version } .0.Z.MAIN"
181+ )
182+ else :
183+ return f"RHEL-{ self .major_version } .{ self .minor_version } .0.Z.MAIN+EUS"
184+ return f"RHEL-{ self .major_version } .{ self .minor_version } .0.GA"
185+ case _:
186+ return NotImplemented
187+
188+ def __sub__ (self , other : int ):
189+ if not isinstance (other , int ):
190+ return NotImplemented
191+
192+ if self .stream == RHELStream .Z :
193+ return RHELVersion (
194+ major_version = self .major_version ,
195+ minor_version = self .minor_version ,
196+ stream = RHELStream ("GA" ),
197+ )
198+
199+ return RHELVersion (
200+ major_version = self .major_version ,
201+ minor_version = self .minor_version - 1 ,
202+ stream = RHELStream ("Z" ),
203+ )
204+
205+
206+ class RHELRelease (BaseModel ):
207+ version : str
208+ # None means already shipped
209+ ship_date : datetime | None
210+
211+
212+ def get_RHEL_version (version_string : str ):
213+ pattern = r"RHEL-(\d+)\.(\d+)(\.\d+)?\.(Z|GA)"
214+ match = re .match (pattern , version_string )
215+ if match is not None :
216+ return RHELVersion (
217+ major_version = int (match .group (1 )),
218+ minor_version = int (match .group (2 )),
219+ stream = RHELStream (match .group (4 )),
220+ )
221+
222+
223+ def get_RHEL_release (param : int | str ):
224+ response = (
225+ ET_api_get ("releases" , params = {"filter[id]" : param })
226+ if isinstance (param , int )
227+ else ET_api_get ("releases" , params = {"filter[name]" : param })
228+ )
229+ release_data = response ["data" ][0 ]
230+
231+ ship_date_string = release_data ["attributes" ]["ship_date" ]
232+ ship_date = (
233+ datetime .strptime (ship_date_string , "%Y-%m-%dT%H:%M:%SZ" ).replace (
234+ tzinfo = timezone .utc
235+ )
236+ if ship_date_string
237+ else None
238+ )
239+
240+ return RHELRelease (version = release_data ["attributes" ]["name" ], ship_date = ship_date )
241+
242+
243+ def get_previous_erratum (current_erratum_id : str | int , package_name : str ):
244+ errata = get_erratum (current_erratum_id )
245+
246+ target_release = get_RHEL_release (errata .group_id )
247+ cur_version = get_RHEL_version (target_release .version )
248+ if cur_version is None :
249+ logger .info (f"Unknow RHEL release format: { target_release .version } " )
250+ return None
251+
252+ related_erratum = ET_api_get (f"packages/{ package_name } " )["data" ]["relationships" ][
253+ "errata"
254+ ]
255+ assert isinstance (related_erratum , list )
256+
257+ rel_prep_lookup = dict ()
258+
259+ for errata in related_erratum :
260+ if errata ["status" ] != ErrataStatus .REL_PREP :
261+ continue
262+
263+ id = errata ["id" ]
264+ cur_errata = get_erratum (id )
265+ cur_release = get_RHEL_release (cur_errata .group_id )
266+
267+ # If the target release of REL_PREP is same as current target release
268+ # Pick it without checking publish date
269+ if cur_release .version == target_release .version :
270+ logger .info ("Exactly same target release from REL_PREP" ) # testing purpose
271+ return get_erratum (id )
272+
273+ rel_prep_lookup [cur_release .version ] = {
274+ "id" : id ,
275+ "publish_date" : cur_errata .publish_date ,
276+ }
277+
278+ cur_time = datetime .now (tz = timezone .utc )
279+ while cur_version .minor_version >= 0 :
280+ # No need to check REL_PREP if the target release has been released
281+ if target_release .ship_date is not None and str (cur_version ) in rel_prep_lookup :
282+ rel_prep = rel_prep_lookup [str (cur_version )]
283+ if rel_prep ["publish_date" ] is None :
284+ logger .info (
285+ f"Encounter REL_PREP errata without publish_date -> Errata ID: { rel_prep ['id' ]} "
286+ )
287+ elif rel_prep ["publish_date" ] <= target_release .ship_date :
288+ logger .info ("From REL_PREP" ) # testing purpose
289+ return get_erratum (rel_prep ["id" ])
290+
291+ release = get_RHEL_release (str (cur_version ))
292+ if release .ship_date is None or release .ship_date <= cur_time :
293+ released_build = ET_api_get (
294+ f"product_versions/{ release .version } /released_builds/{ package_name } "
295+ )
296+ logger .info ("From released build" ) # testing purpose
297+ return get_erratum (released_build ["errata_id" ])
298+
299+ cur_version -= 1
300+
301+
148302class RuleParseError (Exception ):
149303 pass
150304
0 commit comments