From 45f2d7495856ea888dbfae8ec2314349499e82d1 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Thu, 24 Oct 2024 18:34:09 +0530 Subject: [PATCH 01/13] Update --- src/grpc/infer_handler.cc | 2 +- src/grpc/infer_handler.h | 29 +++++++++++++++++++++++++-- src/grpc/stream_infer_handler.cc | 2 +- src/http_server.cc | 9 +++++---- src/http_server.h | 34 ++++++++++++++++++++++++++++---- src/sagemaker_server.h | 9 ++++++--- src/shared_memory_manager.cc | 11 +++++------ src/shared_memory_manager.h | 10 +++++++++- 8 files changed, 84 insertions(+), 22 deletions(-) diff --git a/src/grpc/infer_handler.cc b/src/grpc/infer_handler.cc index c4ba9338cb..27c6817324 100644 --- a/src/grpc/infer_handler.cc +++ b/src/grpc/infer_handler.cc @@ -930,7 +930,7 @@ ModelInferHandler::Execute(InferHandler::State* state) auto request_release_payload = std::make_unique(state->inference_request_); auto response_release_payload = std::make_unique( - state, std::move(shm_regions_info)); + state, std::move(shm_regions_info), shm_manager_); if (err == nullptr) { err = TRITONSERVER_InferenceRequestSetReleaseCallback( diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 87536dd173..7e5fe4e828 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1276,15 +1276,40 @@ class InferHandler : public HandlerBase { State* state_; std::vector> shm_regions_info_; + std::shared_ptr shm_manager_; ResponseReleasePayload( State* state, std::vector< std::shared_ptr>&& - shm_regions_info) - : state_(state), shm_regions_info_(std::move(shm_regions_info)) + shm_regions_info, + const std::shared_ptr& shm_manager) + : state_(state), shm_regions_info_(std::move(shm_regions_info)), + shm_manager_(shm_manager) { } + + ~ResponseReleasePayload() + { + // Unregister shm regions that are waiting for the completion of an + // inference. + while (!shm_regions_info_.empty()) { + auto shm_name = shm_regions_info_.back()->name_; + auto shm_memory_type = shm_regions_info_.back()->kind_; + auto marked_for_unregistration = + shm_regions_info_.back()->marked_for_unregistration_; + + // Delete shared_ptr to decrement reference count + shm_regions_info_.pop_back(); + + if (marked_for_unregistration) { + auto err = shm_manager_->Unregister(shm_name, shm_memory_type); + if (err != nullptr) { + LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); + } + } + } + } }; virtual void StartNewRequest() = 0; diff --git a/src/grpc/stream_infer_handler.cc b/src/grpc/stream_infer_handler.cc index e912e1512c..a7dd9baa77 100644 --- a/src/grpc/stream_infer_handler.cc +++ b/src/grpc/stream_infer_handler.cc @@ -306,7 +306,7 @@ ModelStreamInferHandler::Process(InferHandler::State* state, bool rpc_ok) auto request_release_payload = std::make_unique(state->inference_request_); auto response_release_payload = std::make_unique( - state, std::move(shm_regions_info)); + state, std::move(shm_regions_info), shm_manager_); if (err == nullptr) { err = TRITONSERVER_InferenceRequestSetReleaseCallback( diff --git a/src/http_server.cc b/src/http_server.cc index 99aed411b5..b87da19d2a 100644 --- a/src/http_server.cc +++ b/src/http_server.cc @@ -3283,12 +3283,12 @@ HTTPAPIServer::HandleGenerate( generate_request.reset(new GenerateRequestClass( server_.get(), req, GetResponseCompressionType(req), generate_stream_request_schema_.get(), - generate_stream_response_schema_.get(), streaming, irequest_shared)); + generate_stream_response_schema_.get(), streaming, irequest_shared, shm_manager_)); } else { generate_request.reset(new GenerateRequestClass( server_.get(), req, GetResponseCompressionType(req), generate_request_schema_.get(), generate_response_schema_.get(), - streaming, irequest_shared)); + streaming, irequest_shared, shm_manager_)); } generate_request->trace_ = trace; @@ -3762,10 +3762,11 @@ HTTPAPIServer::InferRequestClass::RequestFiniHook( HTTPAPIServer::InferRequestClass::InferRequestClass( TRITONSERVER_Server* server, evhtp_request_t* req, DataCompressor::Type response_compression_type, - const std::shared_ptr& triton_request) + const std::shared_ptr& triton_request, + const std::shared_ptr& shm_manager) : server_(server), req_(req), response_compression_type_(response_compression_type), response_count_(0), - triton_request_(triton_request) + triton_request_(triton_request), shm_manager_(shm_manager) { evhtp_connection_t* htpconn = evhtp_request_get_connection(req); thread_ = htpconn->thread; diff --git a/src/http_server.h b/src/http_server.h index 3949f97e27..fc0d6f9962 100644 --- a/src/http_server.h +++ b/src/http_server.h @@ -272,13 +272,34 @@ class HTTPAPIServer : public HTTPServer { explicit InferRequestClass( TRITONSERVER_Server* server, evhtp_request_t* req, DataCompressor::Type response_compression_type, - const std::shared_ptr& triton_request); + const std::shared_ptr& triton_request, + const std::shared_ptr& shm_manager); + virtual ~InferRequestClass() { if (req_ != nullptr) { evhtp_request_unset_hook(req_, evhtp_hook_on_request_fini); } req_ = nullptr; + + // Unregister shm regions that are waiting for the completion of an + // inference. + while (!shm_regions_info_.empty()) { + auto shm_name = shm_regions_info_.back()->name_; + auto shm_memory_type = shm_regions_info_.back()->kind_; + auto marked_for_unregistration = + shm_regions_info_.back()->marked_for_unregistration_; + + // Delete shared_ptr to decrement reference count + shm_regions_info_.pop_back(); + + if (marked_for_unregistration) { + auto err = shm_manager_->Unregister(shm_name, shm_memory_type); + if (err != nullptr) { + LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); + } + } + } } evhtp_request_t* EvHtpRequest() const { return req_; } @@ -345,6 +366,8 @@ class HTTPAPIServer : public HTTPServer { std::vector> shm_regions_info_; + std::shared_ptr shm_manager_; + evhtp_res response_code_{EVHTP_RES_OK}; }; @@ -355,9 +378,11 @@ class HTTPAPIServer : public HTTPServer { DataCompressor::Type response_compression_type, const MappingSchema* request_schema, const MappingSchema* response_schema, bool streaming, - const std::shared_ptr& triton_request) + const std::shared_ptr& triton_request, + const std::shared_ptr& shm_manager) : InferRequestClass( - server, req, response_compression_type, triton_request), + server, req, response_compression_type, triton_request, + shm_manager), request_schema_(request_schema), response_schema_(response_schema), streaming_(streaming) { @@ -471,7 +496,8 @@ class HTTPAPIServer : public HTTPServer { const std::shared_ptr& triton_request) { return std::unique_ptr(new InferRequestClass( - server_.get(), req, GetResponseCompressionType(req), triton_request)); + server_.get(), req, GetResponseCompressionType(req), triton_request, + shm_manager_)); } // Helper function to retrieve infer request header in the form specified by diff --git a/src/sagemaker_server.h b/src/sagemaker_server.h index 7b1a272ac0..9ac9a7f45e 100644 --- a/src/sagemaker_server.h +++ b/src/sagemaker_server.h @@ -52,9 +52,11 @@ class SagemakerAPIServer : public HTTPAPIServer { explicit SagemakeInferRequestClass( TRITONSERVER_Server* server, evhtp_request_t* req, DataCompressor::Type response_compression_type, - const std::shared_ptr& triton_request) + const std::shared_ptr& triton_request, + const std::shared_ptr& shm_manager) : InferRequestClass( - server, req, response_compression_type, triton_request) + server, req, response_compression_type, triton_request, + shm_manager) { } using InferRequestClass::InferResponseComplete; @@ -124,7 +126,8 @@ class SagemakerAPIServer : public HTTPAPIServer { override { return std::unique_ptr(new SagemakeInferRequestClass( - server_.get(), req, GetResponseCompressionType(req), triton_request)); + server_.get(), req, GetResponseCompressionType(req), triton_request, + shm_manager_)); } TRITONSERVER_Error* GetInferenceHeaderLength( evhtp_request_t* req, int32_t content_length, diff --git a/src/shared_memory_manager.cc b/src/shared_memory_manager.cc index 7b845709a1..3c32fb010d 100644 --- a/src/shared_memory_manager.cc +++ b/src/shared_memory_manager.cc @@ -686,12 +686,11 @@ SharedMemoryManager::UnregisterHelper( auto it = shared_memory_map_.find(name); if (it != shared_memory_map_.end() && it->second->kind_ == memory_type) { if (it->second.use_count() > 1) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - std::string( - "Cannot unregister shared memory region '" + name + - "', it is currently in use.") - .c_str()); + it->second->marked_for_unregistration_ = true; + LOG_VERBOSE(1) + << "Shared memory region '" << name + << "' will be unregistered after in-flight requests complete."; + return nullptr; } if (it->second->kind_ == TRITONSERVER_MEMORY_CPU) { diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 393fd29128..487fe0c00e 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -26,6 +26,7 @@ #pragma once #include +#include #include #include #include @@ -60,10 +61,16 @@ class SharedMemoryManager { const int64_t device_id) : name_(name), shm_key_(shm_key), offset_(offset), byte_size_(byte_size), shm_fd_(shm_fd), mapped_addr_(mapped_addr), - kind_(kind), device_id_(device_id) + kind_(kind), device_id_(device_id), marked_for_unregistration_(false) { } + ~SharedMemoryInfo() + { + std::cerr << "********* " << name_ + << ": destructor ~SharedMemoryInfo() called !! **********\n"; + } + std::string name_; std::string shm_key_; size_t offset_; @@ -72,6 +79,7 @@ class SharedMemoryManager { void* mapped_addr_; TRITONSERVER_MemoryType kind_; int64_t device_id_; + bool marked_for_unregistration_; }; #ifdef TRITON_ENABLE_GPU From 34911b26f783b64d2f76aecfd18b4b833586ef9e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Thu, 24 Oct 2024 21:37:02 +0530 Subject: [PATCH 02/13] Update CI tests --- .../cuda_shared_memory_test.py | 178 ++++++++++++------ qa/L0_cuda_shared_memory/test.sh | 17 +- qa/L0_shared_memory/shared_memory_test.py | 178 ++++++++++++------ qa/L0_shared_memory/test.sh | 17 +- 4 files changed, 264 insertions(+), 126 deletions(-) diff --git a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py index 51137e8934..5ebbfe088b 100755 --- a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py +++ b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py @@ -401,74 +401,31 @@ def test_infer_byte_size_out_of_bound(self): class TestCudaSharedMemoryUnregister(CudaSharedMemoryTestBase): - def _test_unregister_shm_fail(self): + def _test_unregister_shm_request_pass(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - with self.assertRaises(InferenceServerException) as ex: - second_client.unregister_cuda_shared_memory() - self.assertIn( - "Failed to unregister the following cuda shared memory regions: input0_data ,input1_data ,output0_data ,output1_data", - str(ex.exception), - ) - - with self.assertRaises(InferenceServerException) as ex: - second_client.unregister_cuda_shared_memory("input0_data") - self.assertIn( - "Cannot unregister shared memory region 'input0_data', it is currently in use.", - str(ex.exception), - ) - - with self.assertRaises(InferenceServerException) as ex: - second_client.unregister_cuda_shared_memory("input1_data") - self.assertIn( - "Cannot unregister shared memory region 'input1_data', it is currently in use.", - str(ex.exception), - ) + status_before_unregister = second_client.get_cuda_shared_memory_status() + self.assertEqual(len(status_before_unregister), 4) - with self.assertRaises(InferenceServerException) as ex: - second_client.unregister_cuda_shared_memory("output0_data") - self.assertIn( - "Cannot unregister shared memory region 'output0_data', it is currently in use.", - str(ex.exception), - ) + # Unregister all should not result in an error. + # If shared memory regions are in use, they will be marked and unregistered after the inference is completed. + second_client.unregister_cuda_shared_memory() - with self.assertRaises(InferenceServerException) as ex: - second_client.unregister_cuda_shared_memory("output1_data") - self.assertIn( - "Cannot unregister shared memory region 'output1_data', it is currently in use.", - str(ex.exception), - ) + # Number of shared memory regions should be the same as the inference is not completed yet + status_after_unregister = second_client.get_cuda_shared_memory_status() + self.assertEqual(len(status_after_unregister), 4) def _test_shm_not_found(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - with self.assertRaises(InferenceServerException) as ex: - second_client.get_cuda_shared_memory_status("input0_data") - self.assertIn( - "Unable to find cuda shared memory region: 'input0_data'", - str(ex.exception), - ) + status = second_client.get_cuda_shared_memory_status() + self.assertEqual(len(status), 0) - with self.assertRaises(InferenceServerException) as ex: - second_client.get_cuda_shared_memory_status("input1_data") - self.assertIn( - "Unable to find cuda shared memory region: 'input1_data'", - str(ex.exception), - ) + def _test_shm_found(self): + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - with self.assertRaises(InferenceServerException) as ex: - second_client.get_cuda_shared_memory_status("output0_data") - self.assertIn( - "Unable to find cuda shared memory region: 'output0_data'", - str(ex.exception), - ) - - with self.assertRaises(InferenceServerException) as ex: - second_client.get_cuda_shared_memory_status("output1_data") - self.assertIn( - "Unable to find cuda shared memory region: 'output1_data'", - str(ex.exception), - ) + status = second_client.get_cuda_shared_memory_status() + self.assertEqual(len(status), 4) def test_unregister_shm_during_inference_http(self): try: @@ -497,12 +454,53 @@ def test_unregister_shm_during_inference_http(self): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_fail() + self._test_unregister_shm_request_pass() # Blocking call async_request.get_result() - # Try unregister shm regions after inference + # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + def test_unregister_shm_after_inference_http(self): + try: + self.triton_client.unregister_cuda_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + + # Ensure inference started + time.sleep(2) + + # Test all registered shm regions exist during inference. + self._test_shm_found() + + # Blocking call + async_request.get_result() + + # Test all registered shm regions exist after inference, as unregister API have not been called. + self._test_shm_found() + + # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_cuda_shared_memory() self._test_shm_not_found() @@ -547,7 +545,7 @@ def callback(user_data, result, error): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_fail() + self._test_unregister_shm_request_pass() # Wait until the results are available in user_data time_out = 20 @@ -556,7 +554,63 @@ def callback(user_data, result, error): time.sleep(1) time.sleep(2) - # Try unregister shm regions after inference + # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + def test_unregister_shm_after_inference_grpc(self): + try: + self.triton_client.unregister_cuda_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + user_data = [] + + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) + + # Ensure inference started + time.sleep(2) + + # Test all registered shm regions exist during inference. + self._test_shm_found() + + # Wait until the results are available in user_data + time_out = 20 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Test all registered shm regions exist after inference, as unregister API have not been called. + self._test_shm_found() + + # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_cuda_shared_memory() self._test_shm_not_found() diff --git a/qa/L0_cuda_shared_memory/test.sh b/qa/L0_cuda_shared_memory/test.sh index b7126a9295..6e5ef50942 100755 --- a/qa/L0_cuda_shared_memory/test.sh +++ b/qa/L0_cuda_shared_memory/test.sh @@ -100,7 +100,7 @@ for client_type in http grpc; do fi export CLIENT_TYPE=$client_type - CLIENT_LOG="./unregister_shm.$client_type.client.log" + CLIENT_LOG="./unregister_shm_during_inference_$client_type.client.log" set +e python3 $SHM_TEST TestCudaSharedMemoryUnregister.test_unregister_shm_during_inference_$client_type >>$CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then @@ -116,6 +116,21 @@ for client_type in http grpc; do fi fi + CLIENT_LOG="./unregister_shm_after_inference_$client_type.client.log" + python3 $SHM_TEST TestCudaSharedMemoryUnregister.test_unregister_shm_after_inference_$client_type >>$CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Failed\n***" + RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $TEST_RESULT_FILE + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi + fi + kill $SERVER_PID wait $SERVER_PID if [ $? -ne 0 ]; then diff --git a/qa/L0_shared_memory/shared_memory_test.py b/qa/L0_shared_memory/shared_memory_test.py index 871fca9b2a..87c128f136 100755 --- a/qa/L0_shared_memory/shared_memory_test.py +++ b/qa/L0_shared_memory/shared_memory_test.py @@ -448,74 +448,31 @@ def test_python_client_leak(self): class TestSharedMemoryUnregister(SystemSharedMemoryTestBase): - def _test_unregister_shm_fail(self): + def _test_unregister_shm_request_pass(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.unregister_system_shared_memory() - self.assertIn( - "Failed to unregister the following system shared memory regions: input0_data ,input1_data ,output0_data ,output1_data", - str(ex.exception), - ) - - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.unregister_system_shared_memory("input0_data") - self.assertIn( - "Cannot unregister shared memory region 'input0_data', it is currently in use.", - str(ex.exception), - ) - - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.unregister_system_shared_memory("input1_data") - self.assertIn( - "Cannot unregister shared memory region 'input1_data', it is currently in use.", - str(ex.exception), - ) + status_before_unregister = second_client.get_system_shared_memory_status() + self.assertEqual(len(status_before_unregister), 4) - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.unregister_system_shared_memory("output0_data") - self.assertIn( - "Cannot unregister shared memory region 'output0_data', it is currently in use.", - str(ex.exception), - ) + # Unregister all should not result in an error. + # If shared memory regions are in use, they will be marked and unregistered after the inference is completed. + second_client.unregister_system_shared_memory() - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.unregister_system_shared_memory("output1_data") - self.assertIn( - "Cannot unregister shared memory region 'output1_data', it is currently in use.", - str(ex.exception), - ) + # Number of shared memory regions should be the same as the inference is not completed yet + status_after_unregister = second_client.get_system_shared_memory_status() + self.assertEqual(len(status_after_unregister), 4) def _test_shm_not_found(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.get_system_shared_memory_status("input0_data") - self.assertIn( - "Unable to find system shared memory region: 'input0_data'", - str(ex.exception), - ) + status = second_client.get_system_shared_memory_status() + self.assertEqual(len(status), 0) - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.get_system_shared_memory_status("input1_data") - self.assertIn( - "Unable to find system shared memory region: 'input1_data'", - str(ex.exception), - ) + def _test_shm_found(self): + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.get_system_shared_memory_status("output0_data") - self.assertIn( - "Unable to find system shared memory region: 'output0_data'", - str(ex.exception), - ) - - with self.assertRaises(utils.InferenceServerException) as ex: - second_client.get_system_shared_memory_status("output1_data") - self.assertIn( - "Unable to find system shared memory region: 'output1_data'", - str(ex.exception), - ) + status = second_client.get_system_shared_memory_status() + self.assertEqual(len(status), 4) def test_unregister_shm_during_inference_http(self): try: @@ -544,12 +501,53 @@ def test_unregister_shm_during_inference_http(self): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_fail() + self._test_unregister_shm_request_pass() + + # Blocking call + async_request.get_result() + + # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + def test_unregister_shm_after_inference_http(self): + try: + self.triton_client.unregister_system_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + + # Ensure inference started + time.sleep(2) + + # Test all registered shm regions exist during inference. + self._test_shm_found() # Blocking call async_request.get_result() - # Try unregister shm regions after inference + # Test all registered shm regions exist after inference, as unregister API have not been called. + self._test_shm_found() + + # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_system_shared_memory() self._test_shm_not_found() @@ -594,7 +592,7 @@ def callback(user_data, result, error): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_fail() + self._test_unregister_shm_request_pass() # Wait until the results are available in user_data time_out = 20 @@ -603,7 +601,63 @@ def callback(user_data, result, error): time.sleep(1) time.sleep(2) - # Try unregister shm regions after inference + # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + def test_unregister_shm_after_inference_grpc(self): + try: + self.triton_client.unregister_system_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + user_data = [] + + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) + + # Ensure inference started + time.sleep(2) + + # Test all registered shm regions exist during inference. + self._test_shm_found() + + # Wait until the results are available in user_data + time_out = 20 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Test all registered shm regions exist after inference, as unregister API have not been called. + self._test_shm_found() + + # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_system_shared_memory() self._test_shm_not_found() diff --git a/qa/L0_shared_memory/test.sh b/qa/L0_shared_memory/test.sh index e711de9cff..23b5bb9675 100755 --- a/qa/L0_shared_memory/test.sh +++ b/qa/L0_shared_memory/test.sh @@ -110,7 +110,7 @@ for client_type in http grpc; do fi export CLIENT_TYPE=$client_type - CLIENT_LOG="./unregister_shm.$client_type.client.log" + CLIENT_LOG="./unregister_shm_during_inference_$client_type.client.log" set +e python3 $SHM_TEST TestSharedMemoryUnregister.test_unregister_shm_during_inference_$client_type >>$CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then @@ -126,6 +126,21 @@ for client_type in http grpc; do fi fi + CLIENT_LOG="./unregister_shm_after_inference_$client_type.client.log" + python3 $SHM_TEST TestSharedMemoryUnregister.test_unregister_shm_after_inference_$client_type >>$CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Failed\n***" + RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $TEST_RESULT_FILE + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi + fi + kill $SERVER_PID wait $SERVER_PID if [ $? -ne 0 ]; then From 8a4676c5c8e7a67e236886e026874db61af3c5ed Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 25 Oct 2024 20:45:03 +0530 Subject: [PATCH 03/13] Remove debug changes --- src/shared_memory_manager.h | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 487fe0c00e..1fd2900fcf 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -65,12 +65,6 @@ class SharedMemoryManager { { } - ~SharedMemoryInfo() - { - std::cerr << "********* " << name_ - << ": destructor ~SharedMemoryInfo() called !! **********\n"; - } - std::string name_; std::string shm_key_; size_t offset_; From 94b53c3f5621279c29724d9dea0f946dc2294482 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 25 Oct 2024 20:45:30 +0530 Subject: [PATCH 04/13] Remove debug changes --- src/shared_memory_manager.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 1fd2900fcf..4fc13cfcde 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -26,7 +26,6 @@ #pragma once #include -#include #include #include #include From 235d8f81b0160022308f70d8b6e6e84076033d0c Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 25 Oct 2024 21:01:33 +0530 Subject: [PATCH 05/13] Update copyright --- src/http_server.cc | 3 ++- src/sagemaker_server.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/http_server.cc b/src/http_server.cc index b87da19d2a..f470177bd7 100644 --- a/src/http_server.cc +++ b/src/http_server.cc @@ -3283,7 +3283,8 @@ HTTPAPIServer::HandleGenerate( generate_request.reset(new GenerateRequestClass( server_.get(), req, GetResponseCompressionType(req), generate_stream_request_schema_.get(), - generate_stream_response_schema_.get(), streaming, irequest_shared, shm_manager_)); + generate_stream_response_schema_.get(), streaming, irequest_shared, + shm_manager_)); } else { generate_request.reset(new GenerateRequestClass( server_.get(), req, GetResponseCompressionType(req), diff --git a/src/sagemaker_server.h b/src/sagemaker_server.h index 9ac9a7f45e..42af4ef2e9 100644 --- a/src/sagemaker_server.h +++ b/src/sagemaker_server.h @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions From dc2dcfc18a2095719a327b41ddc02e1ae45ed05a Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Fri, 25 Oct 2024 21:05:48 +0530 Subject: [PATCH 06/13] Fix pre-commit --- qa/L0_shared_memory/shared_memory_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/L0_shared_memory/shared_memory_test.py b/qa/L0_shared_memory/shared_memory_test.py index 87c128f136..7d114141af 100755 --- a/qa/L0_shared_memory/shared_memory_test.py +++ b/qa/L0_shared_memory/shared_memory_test.py @@ -511,7 +511,7 @@ def test_unregister_shm_during_inference_http(self): finally: self._cleanup_server(shm_handles) - + def test_unregister_shm_after_inference_http(self): try: self.triton_client.unregister_system_shared_memory() From febb029d8de695ad325272634e7d4214ebb28ee7 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 28 Oct 2024 14:18:27 +0530 Subject: [PATCH 07/13] Update variable name --- src/grpc/infer_handler.h | 6 +++--- src/http_server.h | 6 +++--- src/shared_memory_manager.cc | 2 +- src/shared_memory_manager.h | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 7e5fe4e828..9a71bbb0f6 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1296,13 +1296,13 @@ class InferHandler : public HandlerBase { while (!shm_regions_info_.empty()) { auto shm_name = shm_regions_info_.back()->name_; auto shm_memory_type = shm_regions_info_.back()->kind_; - auto marked_for_unregistration = - shm_regions_info_.back()->marked_for_unregistration_; + auto pending_unregister = + shm_regions_info_.back()->pending_unregister_; // Delete shared_ptr to decrement reference count shm_regions_info_.pop_back(); - if (marked_for_unregistration) { + if (pending_unregister) { auto err = shm_manager_->Unregister(shm_name, shm_memory_type); if (err != nullptr) { LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); diff --git a/src/http_server.h b/src/http_server.h index fc0d6f9962..2d7b8e6297 100644 --- a/src/http_server.h +++ b/src/http_server.h @@ -287,13 +287,13 @@ class HTTPAPIServer : public HTTPServer { while (!shm_regions_info_.empty()) { auto shm_name = shm_regions_info_.back()->name_; auto shm_memory_type = shm_regions_info_.back()->kind_; - auto marked_for_unregistration = - shm_regions_info_.back()->marked_for_unregistration_; + auto pending_unregister = + shm_regions_info_.back()->pending_unregister_; // Delete shared_ptr to decrement reference count shm_regions_info_.pop_back(); - if (marked_for_unregistration) { + if (pending_unregister) { auto err = shm_manager_->Unregister(shm_name, shm_memory_type); if (err != nullptr) { LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); diff --git a/src/shared_memory_manager.cc b/src/shared_memory_manager.cc index 3c32fb010d..41b3f8890e 100644 --- a/src/shared_memory_manager.cc +++ b/src/shared_memory_manager.cc @@ -686,7 +686,7 @@ SharedMemoryManager::UnregisterHelper( auto it = shared_memory_map_.find(name); if (it != shared_memory_map_.end() && it->second->kind_ == memory_type) { if (it->second.use_count() > 1) { - it->second->marked_for_unregistration_ = true; + it->second->pending_unregister_ = true; LOG_VERBOSE(1) << "Shared memory region '" << name << "' will be unregistered after in-flight requests complete."; diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 4fc13cfcde..29352dff02 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -60,7 +60,7 @@ class SharedMemoryManager { const int64_t device_id) : name_(name), shm_key_(shm_key), offset_(offset), byte_size_(byte_size), shm_fd_(shm_fd), mapped_addr_(mapped_addr), - kind_(kind), device_id_(device_id), marked_for_unregistration_(false) + kind_(kind), device_id_(device_id), pending_unregister_(false) { } @@ -72,7 +72,7 @@ class SharedMemoryManager { void* mapped_addr_; TRITONSERVER_MemoryType kind_; int64_t device_id_; - bool marked_for_unregistration_; + bool pending_unregister_; }; #ifdef TRITON_ENABLE_GPU From c0a22bd83309e8fc7b9ae41c26932d68e6a58b18 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 28 Oct 2024 14:23:32 +0530 Subject: [PATCH 08/13] Update --- src/grpc/infer_handler.h | 6 +++--- src/http_server.h | 6 +++--- src/shared_memory_manager.cc | 2 +- src/shared_memory_manager.h | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 9a71bbb0f6..8cb5c8315c 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1296,13 +1296,13 @@ class InferHandler : public HandlerBase { while (!shm_regions_info_.empty()) { auto shm_name = shm_regions_info_.back()->name_; auto shm_memory_type = shm_regions_info_.back()->kind_; - auto pending_unregister = - shm_regions_info_.back()->pending_unregister_; + auto awaiting_unregister = + shm_regions_info_.back()->awaiting_unregister_; // Delete shared_ptr to decrement reference count shm_regions_info_.pop_back(); - if (pending_unregister) { + if (awaiting_unregister) { auto err = shm_manager_->Unregister(shm_name, shm_memory_type); if (err != nullptr) { LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); diff --git a/src/http_server.h b/src/http_server.h index 2d7b8e6297..1b8680a19a 100644 --- a/src/http_server.h +++ b/src/http_server.h @@ -287,13 +287,13 @@ class HTTPAPIServer : public HTTPServer { while (!shm_regions_info_.empty()) { auto shm_name = shm_regions_info_.back()->name_; auto shm_memory_type = shm_regions_info_.back()->kind_; - auto pending_unregister = - shm_regions_info_.back()->pending_unregister_; + auto awaiting_unregister = + shm_regions_info_.back()->awaiting_unregister_; // Delete shared_ptr to decrement reference count shm_regions_info_.pop_back(); - if (pending_unregister) { + if (awaiting_unregister) { auto err = shm_manager_->Unregister(shm_name, shm_memory_type); if (err != nullptr) { LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); diff --git a/src/shared_memory_manager.cc b/src/shared_memory_manager.cc index 41b3f8890e..e1966f79c8 100644 --- a/src/shared_memory_manager.cc +++ b/src/shared_memory_manager.cc @@ -686,7 +686,7 @@ SharedMemoryManager::UnregisterHelper( auto it = shared_memory_map_.find(name); if (it != shared_memory_map_.end() && it->second->kind_ == memory_type) { if (it->second.use_count() > 1) { - it->second->pending_unregister_ = true; + it->second->awaiting_unregister_ = true; LOG_VERBOSE(1) << "Shared memory region '" << name << "' will be unregistered after in-flight requests complete."; diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 29352dff02..9af9abaade 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -60,7 +60,7 @@ class SharedMemoryManager { const int64_t device_id) : name_(name), shm_key_(shm_key), offset_(offset), byte_size_(byte_size), shm_fd_(shm_fd), mapped_addr_(mapped_addr), - kind_(kind), device_id_(device_id), pending_unregister_(false) + kind_(kind), device_id_(device_id), awaiting_unregister_(false) { } @@ -72,7 +72,7 @@ class SharedMemoryManager { void* mapped_addr_; TRITONSERVER_MemoryType kind_; int64_t device_id_; - bool pending_unregister_; + bool awaiting_unregister_; }; #ifdef TRITON_ENABLE_GPU From 0541f33cd910188228fc4501db4de519c381533e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 4 Nov 2024 13:35:33 +0530 Subject: [PATCH 09/13] Update --- .../cuda_shared_memory_test.py | 59 +++++++++++-------- qa/L0_shared_memory/shared_memory_test.py | 59 +++++++++++-------- src/grpc/infer_handler.h | 10 +++- src/http_server.h | 10 +++- 4 files changed, 86 insertions(+), 52 deletions(-) diff --git a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py index 5ebbfe088b..3bfb1fedd2 100755 --- a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py +++ b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py @@ -401,36 +401,46 @@ def test_infer_byte_size_out_of_bound(self): class TestCudaSharedMemoryUnregister(CudaSharedMemoryTestBase): - def _test_unregister_shm_request_pass(self): - second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - - status_before_unregister = second_client.get_cuda_shared_memory_status() - self.assertEqual(len(status_before_unregister), 4) + def _test_unregister_shm_request_pass(self, shm_names): + self._test_shm_found(shm_names) # Unregister all should not result in an error. # If shared memory regions are in use, they will be marked and unregistered after the inference is completed. - second_client.unregister_cuda_shared_memory() + with httpclient.InferenceServerClient( + "localhost:8000", verbose=True + ) as second_client: + second_client.unregister_cuda_shared_memory() # Number of shared memory regions should be the same as the inference is not completed yet - status_after_unregister = second_client.get_cuda_shared_memory_status() - self.assertEqual(len(status_after_unregister), 4) + self._test_shm_found(shm_names) - def _test_shm_not_found(self): + def _test_shm_not_found(self, shm_names): + self.assertGreater(len(shm_names), 0) second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - status = second_client.get_cuda_shared_memory_status() - self.assertEqual(len(status), 0) + for shm_name in shm_names: + with self.assertRaises(InferenceServerException) as ex: + second_client.get_cuda_shared_memory_status(shm_name) + self.assertIn( + f"Unable to find cuda shared memory region: '{shm_name}'", + str(ex.exception), + ) - def _test_shm_found(self): + def _test_shm_found(self, shm_names): + self.assertGreater(len(shm_names), 0) second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) status = second_client.get_cuda_shared_memory_status() - self.assertEqual(len(status), 4) + self.assertEqual(len(status), len(shm_names)) + + for shm_info in status: + self.assertIn(shm_info["name"], shm_names) def test_unregister_shm_during_inference_http(self): try: self.triton_client.unregister_cuda_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ httpclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -454,13 +464,13 @@ def test_unregister_shm_during_inference_http(self): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass() + self._test_unregister_shm_request_pass(shm_names) # Blocking call async_request.get_result() # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) @@ -469,6 +479,7 @@ def test_unregister_shm_after_inference_http(self): try: self.triton_client.unregister_cuda_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ httpclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -492,17 +503,17 @@ def test_unregister_shm_after_inference_http(self): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found() + self._test_shm_found(shm_names) # Blocking call async_request.get_result() # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found() + self._test_shm_found(shm_names) # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_cuda_shared_memory() - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) @@ -511,6 +522,7 @@ def test_unregister_shm_during_inference_grpc(self): try: self.triton_client.unregister_cuda_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ grpcclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -545,7 +557,7 @@ def callback(user_data, result, error): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass() + self._test_unregister_shm_request_pass(shm_names) # Wait until the results are available in user_data time_out = 20 @@ -555,7 +567,7 @@ def callback(user_data, result, error): time.sleep(2) # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) @@ -564,6 +576,7 @@ def test_unregister_shm_after_inference_grpc(self): try: self.triton_client.unregister_cuda_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ grpcclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -598,7 +611,7 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found() + self._test_shm_found(shm_names) # Wait until the results are available in user_data time_out = 20 @@ -608,11 +621,11 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found() + self._test_shm_found(shm_names) # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_cuda_shared_memory() - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) diff --git a/qa/L0_shared_memory/shared_memory_test.py b/qa/L0_shared_memory/shared_memory_test.py index 7d114141af..4f0d8eab7a 100755 --- a/qa/L0_shared_memory/shared_memory_test.py +++ b/qa/L0_shared_memory/shared_memory_test.py @@ -448,36 +448,46 @@ def test_python_client_leak(self): class TestSharedMemoryUnregister(SystemSharedMemoryTestBase): - def _test_unregister_shm_request_pass(self): - second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - - status_before_unregister = second_client.get_system_shared_memory_status() - self.assertEqual(len(status_before_unregister), 4) + def _test_unregister_shm_request_pass(self, shm_names): + self._test_shm_found(shm_names) # Unregister all should not result in an error. # If shared memory regions are in use, they will be marked and unregistered after the inference is completed. - second_client.unregister_system_shared_memory() + with httpclient.InferenceServerClient( + "localhost:8000", verbose=True + ) as second_client: + second_client.unregister_system_shared_memory() # Number of shared memory regions should be the same as the inference is not completed yet - status_after_unregister = second_client.get_system_shared_memory_status() - self.assertEqual(len(status_after_unregister), 4) + self._test_shm_found(shm_names) - def _test_shm_not_found(self): + def _test_shm_not_found(self, shm_names): + self.assertGreater(len(shm_names), 0) second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - status = second_client.get_system_shared_memory_status() - self.assertEqual(len(status), 0) + for shm_name in shm_names: + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.get_system_shared_memory_status(shm_name) + self.assertIn( + f"Unable to find system shared memory region: '{shm_name}'", + str(ex.exception), + ) - def _test_shm_found(self): + def _test_shm_found(self, shm_names): + self.assertGreater(len(shm_names), 0) second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) status = second_client.get_system_shared_memory_status() - self.assertEqual(len(status), 4) + self.assertEqual(len(status), len(shm_names)) + + for shm_info in status: + self.assertIn(shm_info["name"], shm_names) def test_unregister_shm_during_inference_http(self): try: self.triton_client.unregister_system_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ httpclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -501,13 +511,13 @@ def test_unregister_shm_during_inference_http(self): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass() + self._test_unregister_shm_request_pass(shm_names) # Blocking call async_request.get_result() # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) @@ -516,6 +526,7 @@ def test_unregister_shm_after_inference_http(self): try: self.triton_client.unregister_system_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ httpclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -539,17 +550,17 @@ def test_unregister_shm_after_inference_http(self): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found() + self._test_shm_found(shm_names) # Blocking call async_request.get_result() # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found() + self._test_shm_found(shm_names) # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_system_shared_memory() - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) @@ -558,6 +569,7 @@ def test_unregister_shm_during_inference_grpc(self): try: self.triton_client.unregister_system_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ grpcclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -592,7 +604,7 @@ def callback(user_data, result, error): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass() + self._test_unregister_shm_request_pass(shm_names) # Wait until the results are available in user_data time_out = 20 @@ -602,7 +614,7 @@ def callback(user_data, result, error): time.sleep(2) # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) @@ -611,6 +623,7 @@ def test_unregister_shm_after_inference_grpc(self): try: self.triton_client.unregister_system_shared_memory() shm_handles = self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] inputs = [ grpcclient.InferInput("INPUT0", [1, 16], "INT32"), @@ -645,7 +658,7 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found() + self._test_shm_found(shm_names) # Wait until the results are available in user_data time_out = 20 @@ -655,11 +668,11 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found() + self._test_shm_found(shm_names) # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_system_shared_memory() - self._test_shm_not_found() + self._test_shm_not_found(shm_names) finally: self._cleanup_server(shm_handles) diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 8cb5c8315c..45454e09bd 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -1303,9 +1303,13 @@ class InferHandler : public HandlerBase { shm_regions_info_.pop_back(); if (awaiting_unregister) { - auto err = shm_manager_->Unregister(shm_name, shm_memory_type); - if (err != nullptr) { - LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); + if (shm_manager_ != nullptr) { + auto err = shm_manager_->Unregister(shm_name, shm_memory_type); + if (err != nullptr) { + LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); + } + } else { + LOG_VERBOSE(1) << "Shared memory manager is not available"; } } } diff --git a/src/http_server.h b/src/http_server.h index 1b8680a19a..6e2a35a79f 100644 --- a/src/http_server.h +++ b/src/http_server.h @@ -294,9 +294,13 @@ class HTTPAPIServer : public HTTPServer { shm_regions_info_.pop_back(); if (awaiting_unregister) { - auto err = shm_manager_->Unregister(shm_name, shm_memory_type); - if (err != nullptr) { - LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); + if (shm_manager_ != nullptr) { + auto err = shm_manager_->Unregister(shm_name, shm_memory_type); + if (err != nullptr) { + LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); + } + } else { + LOG_VERBOSE(1) << "Shared memory manager is not available"; } } } From 43c52f393ac0de54736641da5471da698b0f0b39 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 4 Nov 2024 14:14:21 +0530 Subject: [PATCH 10/13] Update --- qa/L0_shared_memory/shared_memory_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/L0_shared_memory/shared_memory_test.py b/qa/L0_shared_memory/shared_memory_test.py index 72525bc5b2..a529f778b0 100755 --- a/qa/L0_shared_memory/shared_memory_test.py +++ b/qa/L0_shared_memory/shared_memory_test.py @@ -49,8 +49,8 @@ class SystemSharedMemoryTestBase(tu.TestResultCollector): DEFAULT_SHM_BYTE_SIZE = 64 def setUp(self): - self._setup_client()shm_handles = - self._ [] + self._setup_client() + self._shm_handles = [] def tearDown(self): self._cleanup_shm_handles() From 4cbbb7b79d4c908de5473ac79fdc77f1c1499b1e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 4 Nov 2024 18:09:22 +0530 Subject: [PATCH 11/13] Update cuda shm test --- .../cuda_shared_memory_test.py | 426 +++++++++--------- 1 file changed, 211 insertions(+), 215 deletions(-) diff --git a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py index 3bfb1fedd2..77634e0f5d 100755 --- a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py +++ b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py @@ -49,6 +49,10 @@ class CudaSharedMemoryTestBase(tu.TestResultCollector): def setUp(self): self._setup_client() + self._shm_handles = [] + + def tearDown(self): + self._cleanup_shm_handles() def _setup_client(self): self.protocol = os.environ.get("CLIENT_TYPE", "http") @@ -89,6 +93,7 @@ def _configure_server( """ + self._cleanup_shm_handles() shm_ip0_handle = cshm.create_shared_memory_region( "input0_data", create_byte_size, device_id ) @@ -131,21 +136,28 @@ def _configure_server( device_id, register_byte_size, ) - return [shm_ip0_handle, shm_ip1_handle, shm_op0_handle, shm_op1_handle] - - def _cleanup_server(self, shm_handles): - for shm_handle in shm_handles: + self._shm_handles = [ + shm_ip0_handle, + shm_ip1_handle, + shm_op0_handle, + shm_op1_handle, + ] + + def _cleanup_shm_handles(self): + for shm_handle in self._shm_handles: cshm.destroy_shared_memory_region(shm_handle) + self._shm_handles = [] class CudaSharedMemoryTest(CudaSharedMemoryTestBase): def test_invalid_create_shm(self): # Raises error since tried to create invalid cuda shared memory region - try: - shm_op0_handle = cshm.create_shared_memory_region("dummy_data", -1, 0) - cshm.destroy_shared_memory_region(shm_op0_handle) - except Exception as ex: - self.assertEqual(str(ex), "unable to create cuda shared memory handle") + with self.assertRaisesRegex( + cshm.SharedMemoryException, "unable to create cuda shared memory handle" + ): + self._shm_handles.append( + cshm.create_shared_memory_region("dummy_data", -1, 0) + ) def test_valid_create_set_register(self): # Create a valid cuda shared memory region, fill data in it and register @@ -212,14 +224,14 @@ def test_reregister_after_register(self): def test_unregister_after_inference(self): # Unregister after inference error_msg = [] - shm_handles = self._configure_server() + self._configure_server() iu.shm_basic_infer( self, self.triton_client, - shm_handles[0], - shm_handles[1], - shm_handles[2], - shm_handles[3], + self._shm_handles[0], + self._shm_handles[1], + self._shm_handles[2], + self._shm_handles[3], error_msg, protocol=self.protocol, use_cuda_shared_memory=True, @@ -233,19 +245,19 @@ def test_unregister_after_inference(self): self.assertEqual(len(shm_status), 3) else: self.assertEqual(len(shm_status.regions), 3) - self._cleanup_server(shm_handles) + self._cleanup_shm_handles() def test_register_after_inference(self): # Register after inference error_msg = [] - shm_handles = self._configure_server() + self._configure_server() iu.shm_basic_infer( self, self.triton_client, - shm_handles[0], - shm_handles[1], - shm_handles[2], - shm_handles[3], + self._shm_handles[0], + self._shm_handles[1], + self._shm_handles[2], + self._shm_handles[3], error_msg, protocol=self.protocol, use_cuda_shared_memory=True, @@ -261,13 +273,13 @@ def test_register_after_inference(self): self.assertEqual(len(shm_status), 5) else: self.assertEqual(len(shm_status.regions), 5) - shm_handles.append(shm_ip2_handle) - self._cleanup_server(shm_handles) + self._shm_handles.append(shm_ip2_handle) + self._cleanup_shm_handles() def test_too_big_shm(self): # Shared memory input region larger than needed - Throws error error_msg = [] - shm_handles = self._configure_server() + self._configure_server() shm_ip2_handle = cshm.create_shared_memory_region("input2_data", 128, 0) self.triton_client.register_cuda_shared_memory( "input2_data", cshm.get_raw_handle(shm_ip2_handle), 0, 128 @@ -275,10 +287,10 @@ def test_too_big_shm(self): iu.shm_basic_infer( self, self.triton_client, - shm_handles[0], + self._shm_handles[0], shm_ip2_handle, - shm_handles[2], - shm_handles[3], + self._shm_handles[2], + self._shm_handles[3], error_msg, big_shm_name="input2_data", big_shm_size=128, @@ -290,21 +302,21 @@ def test_too_big_shm(self): "input byte size mismatch for input 'INPUT1' for model 'simple'. Expected 64, got 128", error_msg[-1], ) - shm_handles.append(shm_ip2_handle) - self._cleanup_server(shm_handles) + self._shm_handles.append(shm_ip2_handle) + self._cleanup_shm_handles() def test_mixed_raw_shm(self): # Mix of shared memory and RAW inputs error_msg = [] - shm_handles = self._configure_server() + self._configure_server() input1_data = np.ones(shape=16, dtype=np.int32) iu.shm_basic_infer( self, self.triton_client, - shm_handles[0], + self._shm_handles[0], [input1_data], - shm_handles[2], - shm_handles[3], + self._shm_handles[2], + self._shm_handles[3], error_msg, protocol=self.protocol, use_cuda_shared_memory=True, @@ -312,11 +324,11 @@ def test_mixed_raw_shm(self): if len(error_msg) > 0: raise Exception(error_msg[-1]) - self._cleanup_server(shm_handles) + self._cleanup_shm_handles() def test_unregisterall(self): # Unregister all shared memory blocks - shm_handles = self._configure_server() + self._configure_server() status_before = self.triton_client.get_cuda_shared_memory_status() if self.protocol == "http": self.assertEqual(len(status_before), 4) @@ -328,7 +340,7 @@ def test_unregisterall(self): self.assertEqual(len(status_after), 0) else: self.assertEqual(len(status_after.regions), 0) - self._cleanup_server(shm_handles) + self._cleanup_shm_handles() def test_register_out_of_bound(self): create_byte_size = self.DEFAULT_SHM_BYTE_SIZE @@ -345,7 +357,7 @@ def test_register_out_of_bound(self): def test_infer_offset_out_of_bound(self): # CUDA Shared memory offset outside output region - Throws error error_msg = [] - shm_handles = self._configure_server() + self._configure_server() if self.protocol == "http": # -32 when placed in an int64 signed type, to get a negative offset # by overflowing @@ -357,10 +369,10 @@ def test_infer_offset_out_of_bound(self): iu.shm_basic_infer( self, self.triton_client, - shm_handles[0], - shm_handles[1], - shm_handles[2], - shm_handles[3], + self._shm_handles[0], + self._shm_handles[1], + self._shm_handles[2], + self._shm_handles[3], error_msg, shm_output_offset=offset, protocol=self.protocol, @@ -370,22 +382,22 @@ def test_infer_offset_out_of_bound(self): self.assertEqual(len(error_msg), 1) self.assertIn("Invalid offset for shared memory region", error_msg[0]) - self._cleanup_server(shm_handles) + self._cleanup_shm_handles() def test_infer_byte_size_out_of_bound(self): # Shared memory byte_size outside output region - Throws error error_msg = [] - shm_handles = self._configure_server() + self._configure_server() offset = 60 byte_size = self.DEFAULT_SHM_BYTE_SIZE iu.shm_basic_infer( self, self.triton_client, - shm_handles[0], - shm_handles[1], - shm_handles[2], - shm_handles[3], + self._shm_handles[0], + self._shm_handles[1], + self._shm_handles[2], + self._shm_handles[3], error_msg, shm_output_offset=offset, shm_output_byte_size=byte_size, @@ -397,7 +409,7 @@ def test_infer_byte_size_out_of_bound(self): self.assertIn( "Invalid offset + byte size for shared memory region", error_msg[0] ) - self._cleanup_server(shm_handles) + self._cleanup_shm_handles() class TestCudaSharedMemoryUnregister(CudaSharedMemoryTestBase): @@ -437,198 +449,182 @@ def _test_shm_found(self, shm_names): self.assertIn(shm_info["name"], shm_names) def test_unregister_shm_during_inference_http(self): - try: - self.triton_client.unregister_cuda_shared_memory() - shm_handles = self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - httpclient.InferInput("INPUT0", [1, 16], "INT32"), - httpclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), - httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) - - async_request = self.triton_client.async_infer( - model_name="simple", inputs=inputs, outputs=outputs - ) - - # Ensure inference started - time.sleep(2) + self.triton_client.unregister_cuda_shared_memory() + self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) - # Try unregister shm regions during inference - self._test_unregister_shm_request_pass(shm_names) + # Ensure inference started + time.sleep(2) - # Blocking call - async_request.get_result() + # Try unregister shm regions during inference + self._test_unregister_shm_request_pass(shm_names) - # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found(shm_names) + # Blocking call + async_request.get_result() - finally: - self._cleanup_server(shm_handles) + # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. + self._test_shm_not_found(shm_names) def test_unregister_shm_after_inference_http(self): - try: - self.triton_client.unregister_cuda_shared_memory() - shm_handles = self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - httpclient.InferInput("INPUT0", [1, 16], "INT32"), - httpclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), - httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) - - async_request = self.triton_client.async_infer( - model_name="simple", inputs=inputs, outputs=outputs - ) - - # Ensure inference started - time.sleep(2) + self.triton_client.unregister_cuda_shared_memory() + self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) - # Test all registered shm regions exist during inference. - self._test_shm_found(shm_names) + # Ensure inference started + time.sleep(2) - # Blocking call - async_request.get_result() + # Test all registered shm regions exist during inference. + self._test_shm_found(shm_names) - # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found(shm_names) + # Blocking call + async_request.get_result() - # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. - self.triton_client.unregister_cuda_shared_memory() - self._test_shm_not_found(shm_names) + # Test all registered shm regions exist after inference, as unregister API have not been called. + self._test_shm_found(shm_names) - finally: - self._cleanup_server(shm_handles) + # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. + self.triton_client.unregister_cuda_shared_memory() + self._test_shm_not_found(shm_names) def test_unregister_shm_during_inference_grpc(self): - try: - self.triton_client.unregister_cuda_shared_memory() - shm_handles = self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - grpcclient.InferInput("INPUT0", [1, 16], "INT32"), - grpcclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - grpcclient.InferRequestedOutput("OUTPUT0"), - grpcclient.InferRequestedOutput("OUTPUT1"), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) - - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) - - user_data = [] - - self.triton_client.async_infer( - model_name="simple", - inputs=inputs, - outputs=outputs, - callback=partial(callback, user_data), - ) - - # Ensure inference started - time.sleep(2) + self.triton_client.unregister_cuda_shared_memory() + self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + user_data = [] + + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) - # Try unregister shm regions during inference - self._test_unregister_shm_request_pass(shm_names) + # Ensure inference started + time.sleep(2) - # Wait until the results are available in user_data - time_out = 20 - while (len(user_data) == 0) and time_out > 0: - time_out = time_out - 1 - time.sleep(1) - time.sleep(2) + # Try unregister shm regions during inference + self._test_unregister_shm_request_pass(shm_names) - # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found(shm_names) + # Wait until the results are available in user_data + time_out = 20 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) - finally: - self._cleanup_server(shm_handles) + # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. + self._test_shm_not_found(shm_names) def test_unregister_shm_after_inference_grpc(self): - try: - self.triton_client.unregister_cuda_shared_memory() - shm_handles = self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - grpcclient.InferInput("INPUT0", [1, 16], "INT32"), - grpcclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - grpcclient.InferRequestedOutput("OUTPUT0"), - grpcclient.InferRequestedOutput("OUTPUT1"), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) - - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) - - user_data = [] - - self.triton_client.async_infer( - model_name="simple", - inputs=inputs, - outputs=outputs, - callback=partial(callback, user_data), - ) - - # Ensure inference started - time.sleep(2) + self.triton_client.unregister_cuda_shared_memory() + self._configure_server() + shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + user_data = [] + + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) - # Test all registered shm regions exist during inference. - self._test_shm_found(shm_names) + # Ensure inference started + time.sleep(2) - # Wait until the results are available in user_data - time_out = 20 - while (len(user_data) == 0) and time_out > 0: - time_out = time_out - 1 - time.sleep(1) - time.sleep(2) + # Test all registered shm regions exist during inference. + self._test_shm_found(shm_names) - # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found(shm_names) + # Wait until the results are available in user_data + time_out = 20 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) - # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. - self.triton_client.unregister_cuda_shared_memory() - self._test_shm_not_found(shm_names) + # Test all registered shm regions exist after inference, as unregister API have not been called. + self._test_shm_found(shm_names) - finally: - self._cleanup_server(shm_handles) + # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. + self.triton_client.unregister_cuda_shared_memory() + self._test_shm_not_found(shm_names) if __name__ == "__main__": From ad9aca0901ee09909df9d986a27e881e9f2a1c0d Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 18 Nov 2024 11:08:04 +0530 Subject: [PATCH 12/13] Update CI --- .../cuda_shared_memory_test.py | 239 ++++++++++-------- qa/L0_cuda_shared_memory/test.sh | 73 +++--- qa/L0_shared_memory/shared_memory_test.py | 239 ++++++++++-------- qa/L0_shared_memory/test.sh | 72 +++--- 4 files changed, 341 insertions(+), 282 deletions(-) diff --git a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py index 77634e0f5d..d4a91ce8c8 100755 --- a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py +++ b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py @@ -142,6 +142,7 @@ def _configure_server( shm_op0_handle, shm_op1_handle, ] + self.shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] def _cleanup_shm_handles(self): for shm_handle in self._shm_handles: @@ -412,9 +413,46 @@ def test_infer_byte_size_out_of_bound(self): self._cleanup_shm_handles() +def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + class TestCudaSharedMemoryUnregister(CudaSharedMemoryTestBase): - def _test_unregister_shm_request_pass(self, shm_names): - self._test_shm_found(shm_names) + def _create_request_data(self): + self.triton_client.unregister_cuda_shared_memory() + self._configure_server() + + if self.protocol == "http": + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + else: + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + return inputs, outputs + + def _test_unregister_shm_request_pass(self): + self._test_shm_found() # Unregister all should not result in an error. # If shared memory regions are in use, they will be marked and unregistered after the inference is completed. @@ -424,13 +462,12 @@ def _test_unregister_shm_request_pass(self, shm_names): second_client.unregister_cuda_shared_memory() # Number of shared memory regions should be the same as the inference is not completed yet - self._test_shm_found(shm_names) + self._test_shm_found() - def _test_shm_not_found(self, shm_names): - self.assertGreater(len(shm_names), 0) + def _test_shm_not_found(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - for shm_name in shm_names: + for shm_name in self.shm_names: with self.assertRaises(InferenceServerException) as ex: second_client.get_cuda_shared_memory_status(shm_name) self.assertIn( @@ -438,34 +475,17 @@ def _test_shm_not_found(self, shm_names): str(ex.exception), ) - def _test_shm_found(self, shm_names): - self.assertGreater(len(shm_names), 0) + def _test_shm_found(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) status = second_client.get_cuda_shared_memory_status() - self.assertEqual(len(status), len(shm_names)) + self.assertEqual(len(status), len(self.shm_names)) for shm_info in status: - self.assertIn(shm_info["name"], shm_names) + self.assertIn(shm_info["name"], self.shm_names) - def test_unregister_shm_during_inference_http(self): - self.triton_client.unregister_cuda_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - httpclient.InferInput("INPUT0", [1, 16], "INT32"), - httpclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), - httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + def test_unregister_shm_during_inference_single_req_http(self): + inputs, outputs = self._create_request_data() async_request = self.triton_client.async_infer( model_name="simple", inputs=inputs, outputs=outputs @@ -475,32 +495,49 @@ def test_unregister_shm_during_inference_http(self): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass(shm_names) + self._test_unregister_shm_request_pass() # Blocking call async_request.get_result() # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found(shm_names) + self._test_shm_not_found() - def test_unregister_shm_after_inference_http(self): - self.triton_client.unregister_cuda_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + def test_unregister_shm_during_inference_multiple_req_http(self): + inputs, outputs = self._create_request_data() - inputs = [ - httpclient.InferInput("INPUT0", [1, 16], "INT32"), - httpclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), - httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), - ] + # Place the first request + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + # Ensure inference started + time.sleep(2) - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + # Try unregister shm regions during inference + self._test_unregister_shm_request_pass() + time.sleep(2) + + # Place the second request + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) + second_async_request = second_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + + # Blocking call + async_request.get_result() + + # Shm regions will remain available as the second request is still in progress + self._test_shm_found() + + # Blocking call + second_async_request.get_result() + + # Verify that all shm regions are successfully unregistered once all inference requests have completed, + # without needing to manually call unregister again. + self._test_shm_not_found() + + def test_unregister_shm_after_inference_http(self): + inputs, outputs = self._create_request_data() async_request = self.triton_client.async_infer( model_name="simple", inputs=inputs, outputs=outputs @@ -510,43 +547,20 @@ def test_unregister_shm_after_inference_http(self): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found(shm_names) + self._test_shm_found() # Blocking call async_request.get_result() # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found(shm_names) + self._test_shm_found() # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_cuda_shared_memory() - self._test_shm_not_found(shm_names) - - def test_unregister_shm_during_inference_grpc(self): - self.triton_client.unregister_cuda_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - grpcclient.InferInput("INPUT0", [1, 16], "INT32"), - grpcclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - grpcclient.InferRequestedOutput("OUTPUT0"), - grpcclient.InferRequestedOutput("OUTPUT1"), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) - - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) + self._test_shm_not_found() + def test_unregister_shm_during_inference_single_req_grpc(self): + inputs, outputs = self._create_request_data() user_data = [] self.triton_client.async_infer( @@ -560,7 +574,7 @@ def callback(user_data, result, error): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass(shm_names) + self._test_unregister_shm_request_pass() # Wait until the results are available in user_data time_out = 20 @@ -570,33 +584,58 @@ def callback(user_data, result, error): time.sleep(2) # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found(shm_names) + self._test_shm_not_found() - def test_unregister_shm_after_inference_grpc(self): - self.triton_client.unregister_cuda_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + def test_unregister_shm_during_inference_multiple_req_grpc(self): + inputs, outputs = self._create_request_data() + user_data = [] - inputs = [ - grpcclient.InferInput("INPUT0", [1, 16], "INT32"), - grpcclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - grpcclient.InferRequestedOutput("OUTPUT0"), - grpcclient.InferRequestedOutput("OUTPUT1"), - ] + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + # Ensure inference started + time.sleep(2) - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) + # Try unregister shm regions during inference + self._test_unregister_shm_request_pass() + # Place the second request + second_user_data = [] + second_client = grpcclient.InferenceServerClient("localhost:8001", verbose=True) + second_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, second_user_data), + ) + + # Wait until the 1st request results are available in user_data + time_out = 10 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Shm regions will remain available as the second request is still in progress + self._test_shm_found() + + # Wait until the 2nd request results are available in user_data + time_out = 20 + while (len(second_user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Verify that all shm regions are successfully unregistered once all inference requests have completed, + # without needing to manually call unregister again. + self._test_shm_not_found() + + def test_unregister_shm_after_inference_grpc(self): + inputs, outputs = self._create_request_data() user_data = [] self.triton_client.async_infer( @@ -610,7 +649,7 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found(shm_names) + self._test_shm_found() # Wait until the results are available in user_data time_out = 20 @@ -620,11 +659,11 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found(shm_names) + self._test_shm_found() # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_cuda_shared_memory() - self._test_shm_not_found(shm_names) + self._test_shm_not_found() if __name__ == "__main__": diff --git a/qa/L0_cuda_shared_memory/test.sh b/qa/L0_cuda_shared_memory/test.sh index 6e5ef50942..028399756e 100755 --- a/qa/L0_cuda_shared_memory/test.sh +++ b/qa/L0_cuda_shared_memory/test.sh @@ -89,56 +89,47 @@ cp ../python_models/execute_delayed_model/model.py ./python_models/simple/1/ cp ../python_models/execute_delayed_model/config.pbtxt ./python_models/simple/ sed -i 's/KIND_CPU/KIND_GPU/g' ./python_models/simple/config.pbtxt -for client_type in http grpc; do - SERVER_ARGS="--model-repository=`pwd`/python_models --log-verbose=1 ${SERVER_ARGS_EXTRA}" - SERVER_LOG="./unregister_shm.$client_type.server.log" - run_server - if [ "$SERVER_PID" == "0" ]; then - echo -e "\n***\n*** Failed to start $SERVER\n***" - cat $SERVER_LOG - exit 1 - fi - export CLIENT_TYPE=$client_type - CLIENT_LOG="./unregister_shm_during_inference_$client_type.client.log" - set +e - python3 $SHM_TEST TestCudaSharedMemoryUnregister.test_unregister_shm_during_inference_$client_type >>$CLIENT_LOG 2>&1 - if [ $? -ne 0 ]; then - cat $CLIENT_LOG - echo -e "\n***\n*** Test Failed\n***" - RET=1 - else - check_test_results $TEST_RESULT_FILE 1 +for test_case in \ + test_unregister_shm_during_inference_single_req \ + test_unregister_shm_during_inference_multiple_req \ + test_unregister_shm_after_inference; do + for client_type in http grpc; do + SERVER_ARGS="--model-repository=`pwd`/python_models --log-verbose=1 ${SERVER_ARGS_EXTRA}" + SERVER_LOG="./${test_case}_${client_type}.server.log" + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 + fi + + export CLIENT_TYPE=$client_type + CLIENT_LOG="./${test_case}_${client_type}.client.log" + set +e + python3 $SHM_TEST "TestCudaSharedMemoryUnregister.${test_case}_${client_type}" >>"$CLIENT_LOG" 2>&1 if [ $? -ne 0 ]; then - cat $TEST_RESULT_FILE - echo -e "\n***\n*** Test Result Verification Failed\n***" + cat $CLIENT_LOG + echo -e "\n***\n*** Test Failed - ${test_case}_${client_type}\n***" RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $TEST_RESULT_FILE + echo -e "\n***\n*** Test Result Verification Failed - ${test_case}_${client_type}\n***" + RET=1 + fi fi - fi - CLIENT_LOG="./unregister_shm_after_inference_$client_type.client.log" - python3 $SHM_TEST TestCudaSharedMemoryUnregister.test_unregister_shm_after_inference_$client_type >>$CLIENT_LOG 2>&1 - if [ $? -ne 0 ]; then - cat $CLIENT_LOG - echo -e "\n***\n*** Test Failed\n***" - RET=1 - else - check_test_results $TEST_RESULT_FILE 1 + kill $SERVER_PID + wait $SERVER_PID if [ $? -ne 0 ]; then - cat $TEST_RESULT_FILE - echo -e "\n***\n*** Test Result Verification Failed\n***" + echo -e "\n***\n*** Test Server shut down non-gracefully\n***" RET=1 fi - fi - - kill $SERVER_PID - wait $SERVER_PID - if [ $? -ne 0 ]; then - echo -e "\n***\n*** Test Server shut down non-gracefully\n***" - RET=1 - fi - set -e + set -e done +done if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" diff --git a/qa/L0_shared_memory/shared_memory_test.py b/qa/L0_shared_memory/shared_memory_test.py index a529f778b0..8f3c2fbb52 100755 --- a/qa/L0_shared_memory/shared_memory_test.py +++ b/qa/L0_shared_memory/shared_memory_test.py @@ -129,6 +129,7 @@ def _configure_server( self.triton_client.register_system_shared_memory( "output1_data", "/output1_data", register_byte_size, offset=register_offset ) + self.shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] def _cleanup_shm_handles(self): for shm_handle in self._shm_handles: @@ -456,9 +457,46 @@ def test_python_client_leak(self): ) +def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + class TestSharedMemoryUnregister(SystemSharedMemoryTestBase): - def _test_unregister_shm_request_pass(self, shm_names): - self._test_shm_found(shm_names) + def _create_request_data(self): + self.triton_client.unregister_system_shared_memory() + self._configure_server() + + if self.protocol == "http": + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + else: + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + return inputs, outputs + + def _test_unregister_shm_request_pass(self): + self._test_shm_found() # Unregister all should not result in an error. # If shared memory regions are in use, they will be marked and unregistered after the inference is completed. @@ -468,13 +506,12 @@ def _test_unregister_shm_request_pass(self, shm_names): second_client.unregister_system_shared_memory() # Number of shared memory regions should be the same as the inference is not completed yet - self._test_shm_found(shm_names) + self._test_shm_found() - def _test_shm_not_found(self, shm_names): - self.assertGreater(len(shm_names), 0) + def _test_shm_not_found(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) - for shm_name in shm_names: + for shm_name in self.shm_names: with self.assertRaises(utils.InferenceServerException) as ex: second_client.get_system_shared_memory_status(shm_name) self.assertIn( @@ -482,34 +519,17 @@ def _test_shm_not_found(self, shm_names): str(ex.exception), ) - def _test_shm_found(self, shm_names): - self.assertGreater(len(shm_names), 0) + def _test_shm_found(self): second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) status = second_client.get_system_shared_memory_status() - self.assertEqual(len(status), len(shm_names)) + self.assertEqual(len(status), len(self.shm_names)) for shm_info in status: - self.assertIn(shm_info["name"], shm_names) + self.assertIn(shm_info["name"], self.shm_names) - def test_unregister_shm_during_inference_http(self): - self.triton_client.unregister_system_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - httpclient.InferInput("INPUT0", [1, 16], "INT32"), - httpclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), - httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + def test_unregister_shm_during_inference_single_req_http(self): + inputs, outputs = self._create_request_data() async_request = self.triton_client.async_infer( model_name="simple", inputs=inputs, outputs=outputs @@ -519,32 +539,49 @@ def test_unregister_shm_during_inference_http(self): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass(shm_names) + self._test_unregister_shm_request_pass() # Blocking call async_request.get_result() # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found(shm_names) + self._test_shm_not_found() - def test_unregister_shm_after_inference_http(self): - self.triton_client.unregister_system_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + def test_unregister_shm_during_inference_multiple_req_http(self): + inputs, outputs = self._create_request_data() - inputs = [ - httpclient.InferInput("INPUT0", [1, 16], "INT32"), - httpclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), - httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), - ] + # Place the first request + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + # Ensure inference started + time.sleep(2) - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + # Try unregister shm regions during inference + self._test_unregister_shm_request_pass() + time.sleep(2) + + # Place the second request + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) + second_async_request = second_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + + # Blocking call + async_request.get_result() + + # Shm regions will remain available as the second request is still in progress + self._test_shm_found() + + # Blocking call + second_async_request.get_result() + + # Verify that all shm regions are successfully unregistered once all inference requests have completed, + # without needing to manually call unregister again. + self._test_shm_not_found() + + def test_unregister_shm_after_inference_http(self): + inputs, outputs = self._create_request_data() async_request = self.triton_client.async_infer( model_name="simple", inputs=inputs, outputs=outputs @@ -554,43 +591,20 @@ def test_unregister_shm_after_inference_http(self): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found(shm_names) + self._test_shm_found() # Blocking call async_request.get_result() # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found(shm_names) + self._test_shm_found() # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_system_shared_memory() - self._test_shm_not_found(shm_names) - - def test_unregister_shm_during_inference_grpc(self): - self.triton_client.unregister_system_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] - - inputs = [ - grpcclient.InferInput("INPUT0", [1, 16], "INT32"), - grpcclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - grpcclient.InferRequestedOutput("OUTPUT0"), - grpcclient.InferRequestedOutput("OUTPUT1"), - ] - - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) - - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) + self._test_shm_not_found() + def test_unregister_shm_during_inference_single_req_grpc(self): + inputs, outputs = self._create_request_data() user_data = [] self.triton_client.async_infer( @@ -604,7 +618,7 @@ def callback(user_data, result, error): time.sleep(2) # Try unregister shm regions during inference - self._test_unregister_shm_request_pass(shm_names) + self._test_unregister_shm_request_pass() # Wait until the results are available in user_data time_out = 20 @@ -614,33 +628,58 @@ def callback(user_data, result, error): time.sleep(2) # Test that all shm regions are successfully unregistered after inference without needing to call unregister again. - self._test_shm_not_found(shm_names) + self._test_shm_not_found() - def test_unregister_shm_after_inference_grpc(self): - self.triton_client.unregister_system_shared_memory() - self._configure_server() - shm_names = ["input0_data", "input1_data", "output0_data", "output1_data"] + def test_unregister_shm_during_inference_multiple_req_grpc(self): + inputs, outputs = self._create_request_data() + user_data = [] - inputs = [ - grpcclient.InferInput("INPUT0", [1, 16], "INT32"), - grpcclient.InferInput("INPUT1", [1, 16], "INT32"), - ] - outputs = [ - grpcclient.InferRequestedOutput("OUTPUT0"), - grpcclient.InferRequestedOutput("OUTPUT1"), - ] + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) - inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) - inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) - outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + # Ensure inference started + time.sleep(2) - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) + # Try unregister shm regions during inference + self._test_unregister_shm_request_pass() + # Place the second request + second_user_data = [] + second_client = grpcclient.InferenceServerClient("localhost:8001", verbose=True) + second_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, second_user_data), + ) + + # Wait until the 1st request results are available in user_data + time_out = 10 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Shm regions will remain available as the second request is still in progress + self._test_shm_found() + + # Wait until the 2nd request results are available in user_data + time_out = 20 + while (len(second_user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Verify that all shm regions are successfully unregistered once all inference requests have completed, + # without needing to manually call unregister again. + self._test_shm_not_found() + + def test_unregister_shm_after_inference_grpc(self): + inputs, outputs = self._create_request_data() user_data = [] self.triton_client.async_infer( @@ -654,7 +693,7 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist during inference. - self._test_shm_found(shm_names) + self._test_shm_found() # Wait until the results are available in user_data time_out = 20 @@ -664,11 +703,11 @@ def callback(user_data, result, error): time.sleep(2) # Test all registered shm regions exist after inference, as unregister API have not been called. - self._test_shm_found(shm_names) + self._test_shm_found() # Test all shm regions are successfully unregistered after calling the unregister API after inference completed. self.triton_client.unregister_system_shared_memory() - self._test_shm_not_found(shm_names) + self._test_shm_not_found() if __name__ == "__main__": diff --git a/qa/L0_shared_memory/test.sh b/qa/L0_shared_memory/test.sh index 23b5bb9675..27e8b2dd80 100755 --- a/qa/L0_shared_memory/test.sh +++ b/qa/L0_shared_memory/test.sh @@ -99,56 +99,46 @@ mkdir -p python_models/simple/1/ cp ../python_models/execute_delayed_model/model.py ./python_models/simple/1/ cp ../python_models/execute_delayed_model/config.pbtxt ./python_models/simple/ -for client_type in http grpc; do - SERVER_ARGS="--model-repository=`pwd`/python_models --log-verbose=1 ${SERVER_ARGS_EXTRA}" - SERVER_LOG="./unregister_shm.$client_type.server.log" - run_server - if [ "$SERVER_PID" == "0" ]; then - echo -e "\n***\n*** Failed to start $SERVER\n***" - cat $SERVER_LOG - exit 1 - fi +for test_case in \ + test_unregister_shm_during_inference_single_req \ + test_unregister_shm_during_inference_multiple_req \ + test_unregister_shm_after_inference; do + for client_type in http grpc; do + SERVER_ARGS="--model-repository=`pwd`/python_models --log-verbose=1 ${SERVER_ARGS_EXTRA}" + SERVER_LOG="./${test_case}_${client_type}.server.log" + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 + fi - export CLIENT_TYPE=$client_type - CLIENT_LOG="./unregister_shm_during_inference_$client_type.client.log" - set +e - python3 $SHM_TEST TestSharedMemoryUnregister.test_unregister_shm_during_inference_$client_type >>$CLIENT_LOG 2>&1 - if [ $? -ne 0 ]; then - cat $CLIENT_LOG - echo -e "\n***\n*** Test Failed\n***" - RET=1 - else - check_test_results $TEST_RESULT_FILE 1 + export CLIENT_TYPE=$client_type + CLIENT_LOG="./${test_case}_${client_type}.client.log" + set +e + python3 $SHM_TEST "TestSharedMemoryUnregister.${test_case}_${client_type}" >>"$CLIENT_LOG" 2>&1 if [ $? -ne 0 ]; then - cat $TEST_RESULT_FILE - echo -e "\n***\n*** Test Result Verification Failed\n***" + cat $CLIENT_LOG + echo -e "\n***\n*** Test Failed - ${test_case}_${client_type}\n***" RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $TEST_RESULT_FILE + echo -e "\n***\n*** Test Result Verification Failed - ${test_case}_${client_type}\n***" + RET=1 + fi fi - fi - CLIENT_LOG="./unregister_shm_after_inference_$client_type.client.log" - python3 $SHM_TEST TestSharedMemoryUnregister.test_unregister_shm_after_inference_$client_type >>$CLIENT_LOG 2>&1 - if [ $? -ne 0 ]; then - cat $CLIENT_LOG - echo -e "\n***\n*** Test Failed\n***" - RET=1 - else - check_test_results $TEST_RESULT_FILE 1 + kill $SERVER_PID + wait $SERVER_PID if [ $? -ne 0 ]; then - cat $TEST_RESULT_FILE - echo -e "\n***\n*** Test Result Verification Failed\n***" + echo -e "\n***\n*** Test Server shut down non-gracefully\n***" RET=1 fi - fi - - kill $SERVER_PID - wait $SERVER_PID - if [ $? -ne 0 ]; then - echo -e "\n***\n*** Test Server shut down non-gracefully\n***" - RET=1 - fi - set -e + set -e done +done if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" From 043875501952dda7cfc68257be8bb7dd6ea3d2e5 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 18 Nov 2024 18:42:33 +0530 Subject: [PATCH 13/13] Add TODO comment --- src/shared_memory_manager.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 9af9abaade..0f13f8e167 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -72,6 +72,8 @@ class SharedMemoryManager { void* mapped_addr_; TRITONSERVER_MemoryType kind_; int64_t device_id_; + + // TODO (DLIS-7620): avoid explicit flag and use smart pointers bool awaiting_unregister_; };