Skip to content

Commit

Permalink
支持临时ak/sk进行上传、下载数据
Browse files Browse the repository at this point in the history
  • Loading branch information
wangchuanxin committed Nov 12, 2024
1 parent a888bdd commit 4956e3d
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions dis_sdk_python/com/huaweicloud/dis/sdk/python/client/disclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ class disclient(object):
:type projectid: string
:param projectid: hws project id for the user
:type xSecrityToken: string
:param xSecrityToken: tmp security token
: the user can get the ak/sk/projectid from the hws,the user can refer https://support.huaweicloud.com/usermanual-dis-sdk-resources-demo1/dis_01_0043.html
"""

DIS_SDK_VERSION = '2.0.0'
USER_AGENT = 'dis-python-sdk-v-' + DIS_SDK_VERSION
TIME_OUT = 60

def __init__(self, endpoint, ak, sk, projectid, region, bodySerializeType=''):
def __init__(self, endpoint, ak, sk, projectid, region, bodySerializeType='', xSecrityToken=''):
self.endpoint = endpoint
if not endpoint.startswith("http"):
self.endpoint = "https://" + endpoint
Expand All @@ -74,15 +77,17 @@ def __init__(self, endpoint, ak, sk, projectid, region, bodySerializeType=''):
self.projectid = projectid
self.region = region
self.bodySerializeType = bodySerializeType
self.xSecrityToken = xSecrityToken
self._timeout = self.TIME_OUT
self._useragent = self.USER_AGENT
self.result = []

def updateAuthInfo(self, ak, sk, projectid, region):
def updateAuthInfo(self, ak, sk, projectid, region, xSecrityToken):
self.ak = ak
self.sk = sk
self.projectid = projectid
self.region = region
self.xSecrityToken = xSecrityToken;

def setUserAgent(self, useragent):
self._useragent = useragent
Expand Down Expand Up @@ -111,6 +116,9 @@ def __generateRequest(self, method, uri, query={}, headers={}, body="", userak="
if userxSecrityToken is not "":
req.headers["X-Security-Token"] = userxSecrityToken

if self.xSecrityToken is not "":
req.headers["X-Security-Token"] = self.xSecrityToken

if (headers):
headers.update(req.headers)
req.headers = headers
Expand Down Expand Up @@ -378,7 +386,7 @@ def __list_of_groups(self, init_list, childern_list_len):
end_list.append(init_list[-count:]) if count != 0 else end_list
return end_list

def __Refine_data(self, stream_name, stream_id, records):
def __Refine_data(self, stream_name, stream_id, records, ak="", sk="", xSecrityToken=""):
totalPutRecordsResultEntryList = {}
totalPutRecordsResultEntryList['failed_record_count'] = 0
totalPutRecordsResultEntryList['records'] = []
Expand All @@ -393,7 +401,7 @@ def __Refine_data(self, stream_name, stream_id, records):
if retryCount != -1:
time.sleep(wait)
wait = wait * 2
r = self.__sendRecords(stream_name, stream_id, retryPutRecordsRequest)
r = self.__sendRecords(stream_name, stream_id, retryPutRecordsRequest, ak, sk, xSecrityToken)
currentFailed = r.failedRecordCount
# print("%s: send %s records,failed %s records,retryCount %s" % (
# streamname, len(retryPutRecordsRequest), currentFailed, retryCount + 1))
Expand Down Expand Up @@ -440,10 +448,10 @@ def __Refine_data(self, stream_name, stream_id, records):
log('{}{}:send {} records,failed {} records'.format(stream_name, stream_id, len(records), Faile_count), 'info')
return totalPutRecordsResultEntryList

def putRecords(self, streamname, records):
def putRecords(self, streamname, records, ak="", sk="", xSecrityToken=""):
if not stream_mes.get(streamname):
try:
r = self.describeStream(streamname)
r = self.describeStream(streamname, ak=ak, sk=sk, xSecrityToken=xSecrityToken)
if r.statusCode == 200:
stream_type = r.streamType
partitions = len([i for i in r.partitions if i.get('status') == 'ACTIVE'])
Expand Down Expand Up @@ -480,12 +488,12 @@ def putRecords(self, streamname, records):
end_list[i] = end_list[i][len(b):]
for j in range(0, len(new_records)):
rangeRecords = new_records[j]
r = self.__Refine_data(streamname, "", rangeRecords)
r = self.__Refine_data(streamname, "", rangeRecords, ak, sk, xSecrityToken)
totalPutRecordsResultEntryList['failed_record_count'] += r['failed_record_count']
totalPutRecordsResultEntryList['records'].extend(r['records'])
return disrecordresponse.disPutRecordsResponse(200, totalPutRecordsResultEntryList)

def put_records(self, stream_name, stream_id, records):
def put_records(self, stream_name, stream_id, records, ak="", sk="", xSecrityToken=""):
"""
support authorization scenarios use stream_id
stream_name is mandatory parameter, can be an empty string.
Expand All @@ -500,7 +508,7 @@ def put_records(self, stream_name, stream_id, records):
totalPutRecordsResultEntryList = {}
totalPutRecordsResultEntryList['failed_record_count'] = 0
totalPutRecordsResultEntryList['records'] = []
r = self.__Refine_data(stream_name, stream_id, records)
r = self.__Refine_data(stream_name, stream_id, records, ak, sk, xSecrityToken)
totalPutRecordsResultEntryList['failed_record_count'] += r['failed_record_count']
totalPutRecordsResultEntryList['records'].extend(r['records'])
return disrecordresponse.disPutRecordsResponse(200, totalPutRecordsResultEntryList)
Expand Down

0 comments on commit 4956e3d

Please sign in to comment.