Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smart oauth2 token handling for staging #906

Draft
wants to merge 50 commits into
base: develop
Choose a base branch
from

Conversation

mcolin16
Copy link
Contributor

…a file

@mcolin16 mcolin16 marked this pull request as draft January 30, 2025 14:26
@mcolin16 mcolin16 self-assigned this Jan 30, 2025
@mcolin16 mcolin16 changed the title feat: call the function to get or refresh a token each time we stage … Smart oauth2 token handling for staging Jan 30, 2025
@github-advanced-security
Copy link

This pull request sets up GitHub code scanning for this repository. Once the scans have completed and the checks have passed, the analysis results for this pull request branch will appear on this overview. Once you merge this pull request, the 'Security' tab will show more code scanning analysis results (for example, for the default branch). Depending on your configuration and choice of analysis tool, future pull requests will be annotated with code scanning analysis results. For more information about GitHub code scanning, check out the documentation.

headers=prepare_headers(external_auth_config),
)
if response.status_code != HTTP_200_OK:
logger.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use rs_server_common.utils.utils2.log_http_exception to avoid copy/pasting the message ?

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
1 Security Hotspot
3.2% Duplication on New Code (required ≤ 3%)
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@mcolin16 mcolin16 requested review from agrosu1978 and Padeanu March 11, 2025 13:15
@@ -46,6 +49,59 @@
ACCESS_TK_KEY_IN_RESPONSE = "access_token"
HEADER_CONTENT_TYPE = "application/x-www-form-urlencoded"

# Mandatory attributed that should be present in the token dictionary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "attributed" should be replaced by "atributes"? There are two places where this word appears. Suggestion:
# Mandatory attributes that must be present in the token dictionary:
# Note: Additional attributes are included beyond those returned by the station response.
# These extra attributes include the creation date of both the access token and refresh token.

Check if the token variable contains the mandatory attributes

Args:
token_dict (Any):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rs-documentation will complain when the documentation is built with mkdocs:
token_dict (Any): -> add a description

"""
Retrieve and validate an authentication token for a specific station and service.
Thee are two main use cases:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "There"

current_date = datetime.datetime.now()
diff_in_sec = (current_date - token_dict["access_token_creation_date"]).total_seconds()

logger.info(f"----------- DIFF VAUT: {diff_in_sec}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a more meaningful message should be written in a logger.info. This is not a logger.debug

while attempt < max_retries:
try:
self.connect_s3()
self.logger.info(f"Starting the streaming of {stream_url} to s3://{bucket}/{key}")

if not token_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a protection against an input param, it should be put at the beginning of the function

staging_station_id: identifier of the station on which we want to stage data.
This variable will ve used to define the dask.distributed.Variable object used
to create an access_token shared by all of the Dask workers (there will be as many
shared variable as station involved in the staging process)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo. Suggestion: 'as the number of stations involved'

try:
token_dict = token_info.get()
# Get/refresh the access token if necessary
token_dict = get_station_token(config, token_dict)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dask Variable should be updated in the monitoring thread or a new thread created specially for this from the processing.py in the staging. Here, the token should be provided by the streaming task function that fetches the value from the shared dask Variable. The Lock should be also used within the streaming task function (from processors.py) and not here in the s3_storage_handler. This function should not be changed in fact. The former 'auth' param (from the previous implementation of this function) is in fact the updated token fetched and provided by the streaming task function. Reason for this comment: redundant refreshes across multiple workers / redundant running of the get_station_token function
See bellow the comment for streaming_task function in processors.py

s3_handler = S3StorageHandler(
os.environ["S3_ACCESSKEY"],
os.environ["S3_SECRETKEY"],
os.environ["S3_ENDPOINT"],
os.environ["S3_REGION"],
)
s3_handler.s3_streaming_upload(product_url, trusted_domains, auth, bucket, s3_file)
s3_handler.s3_streaming_upload(product_url, config, bucket, s3_file, token_dict, token_lock)
Copy link
Contributor

@agrosu1978 agrosu1978 Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token_dict is just fetched here from the shared variable. The token_dict should be refreshed by one thread only. This thread should be started before submitting all the dask tasks, similar to the monitoring tasks thread implemented in manage_dask_tasks_results function.
So here, the code should be something like:

try:
        # Create a thread lock to synchronize access to shared resources between the threads of a
        # given worker
        with token_lock:
             # fetch the current value of the Variable. This Variable is updated by one thread only
             # started in processors.py
             auth = token_dict.get("access_token")
        s3_handler = S3StorageHandler(
            os.environ["S3_ACCESSKEY"],
            os.environ["S3_SECRETKEY"],
            os.environ["S3_ENDPOINT"],
            os.environ["S3_REGION"],
        )
        s3_handler.s3_streaming_upload(product_url, trusted_domains, auth, bucket, s3_file)

Please see the comment at line 901

LE: After reading some more documentation about the dask Lock and dask Variable and looking in the source code from dask Variable, I reached at the following conclusion:

  • dask Variable uses a centralized, atomic update mechanism, ensuring that .get() always returns a fully stored value (this is done by the dask scheduler). the usage of client.sync() function leaded me to this idea
  • locks are not needed for reading because .get() never returns a partially written state.
  • only writes need a lock to prevent multiple updates from conflicting. this should be discussed because normally only the thread from the processors.py should write it

So I guess that here (for dask variable reader) the token_lock should not be used in fact

dask_client: Client = self.dask_cluster_connect()
self.submit_tasks_to_dask_cluster(token, external_auth_config.trusted_domains, dask_client)
dask_client = self.dask_cluster_connect(external_auth_config.station_id)
self.submit_tasks_to_dask_cluster(external_auth_config, dask_client)
Copy link
Contributor

@agrosu1978 agrosu1978 Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before calling the submit function, a new thread should be started (similar to manage_dask_tasks_results):

dask_client = self.dask_cluster_connect(external_auth_config.station_id)
try:
    await asyncio.to_thread(self.manage_token, self.external_auth_config, self.manage_token, self.token_lock, self.token_info)
except Exception as e:  # pylint: disable=broad-exception-caught
    self.log_job_execution(JobStatus.failed, 0, f"Error from token refreshment thread: {e}")
self.submit_tasks_to_dask_cluster(external_auth_config, dask_client)

The thread should be something like this (pseudocode)

def manage_token(self, config, token_lock, token_info):
   self.logger.info("Starting the token refreshment logic")

   while (self.process_is_running):
    with token_lock:
        try:
            token_dict = token_info.get()
            # Get/refresh the access token if necessary
            token_dict = get_station_token(config, token_dict)
            token_info.set(token_dict)
         except ......
              ...........
    time.sleep(TIME_TO_SLEEP)

The self.process_is_running is set to False by the monitoring thread manage_dask_tasks_results when it finishes
The reasons for my request are as follows:

  • centralized token refresh logic
  • reduces the eventual redundant requests since only this process refreshes the token
  • dask workers only need to read the token instead of managing the refresh logic
  • no changes in the s3_storage_handler module. the logic of this token refreshment should not be present there
  • ensures each task gets the latest token dynamically
  • important: one setter / multiple readers pattern. easier to manage and maintain the code.

We can discuss about this and of course, I can help you

auth=auth,
)
prepared_request = session.prepare_request(request)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retrying should be moved to the calling user level. The s3 botocore mechanism of retrying is enough. I will handle this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants