1- # pylint: disable=invalid-name
1+ #
2+ # -------------------------------------------------------------------------
3+ #
4+ # Part of the CodeChecker project, under the Apache License v2.0 with
5+ # LLVM Exceptions. See LICENSE for license information.
6+ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7+ #
8+ # -------------------------------------------------------------------------
9+ """
10+ A mock OAuth server that simulates the behavior of an OAuth provider.
11+ """
212
3- from http .server import BaseHTTPRequestHandler , HTTPServer
413import json
514import os
15+
616from authlib .oauth2 .rfc7636 import create_s256_code_challenge as hash_s256
717
18+ from http .server import BaseHTTPRequestHandler , HTTPServer
19+
820# Server config
921HOSTNAME = "0.0.0.0"
1022SERVERPORT = int (os .getenv ("PORT" )) if os .getenv ("PORT" ) else 3000
@@ -116,24 +128,10 @@ def login_tester(self):
116128 params [key ] = value
117129
118130 if "username" in query_params :
131+ # print(f"Login request with username: {params['username']}")
119132 query = f"{ params ['username' ]} :{ params ['password' ]} "
120133 query_result = self .users_by_data .get (query , None )
121- # csrf attack case
122- if params ['username' ] == "user_csrf" :
123- print ("CSRF attack detected" )
124- state = "fake_state"
125- code = query_result ['code' ]
126- code_challenge = params ['code_challenge' ]
127- # store code_challenge in the server
128- self .code_challenges [code ] = {
129- "code_challenge" : code_challenge ,
130- "code_challenge_method" : params [
131- 'code_challenge_method' ]}
132-
133- return self .show_json ({"code" : code ,
134- "state" : state })
135- # normal case
136- elif query_result :
134+ if query_result :
137135 state = params ['state' ]
138136 code = query_result ['code' ]
139137 code_challenge = params ['code_challenge' ]
@@ -149,7 +147,7 @@ def login_tester(self):
149147 except IndexError :
150148 return self .show_rejection ("Invalid query parameters" )
151149 except Exception as ex :
152- print (f"Error: { ex } " )
150+ print (f"Error in login_tester of OAuth mock server : { ex } " )
153151 return self .show_rejection ("Internal server error" )
154152
155153 def get_user (self ):
@@ -204,13 +202,15 @@ def handle_user_token_request(self):
204202 return self .show_rejection ("Invalid code" )
205203 return self .path
206204
205+ # pylint: disable=invalid-name
207206 def do_GET (self ):
208207 if self .path .startswith ("/login" ):
209208 return self .login_tester ()
210209 elif self .path .startswith ("/get_user" ):
211210 return self .get_user ()
212211 return self .path
213212
213+ # pylint: disable=invalid-name
214214 def do_POST (self ):
215215 if self .path .endswith ("/token" ):
216216 return self .handle_user_token_request ()
@@ -220,8 +220,8 @@ def do_POST(self):
220220
221221webServer = HTTPServer ((HOSTNAME , SERVERPORT ), OauthServer )
222222webServer .allow_reuse_address = True
223- print (f"Server started http://{ HOSTNAME } :{ SERVERPORT } " )
223+ # print(f"OAuth mock server started on http://{HOSTNAME}:{SERVERPORT}")
224224
225225webServer .serve_forever ()
226226webServer .server_close ()
227- print ("Server stopped. " )
227+ print (f"OAuth mock server stopped on http:// { HOSTNAME } : { SERVERPORT } " )
0 commit comments