1+ // Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
2+ //
3+ // Licensed under the BSD 3-Clause License (the "License"); you may not use this file
4+ // except in compliance with the License. You may obtain a copy of the License at
5+ //
6+ // https://opensource.org/licenses/BSD-3-Clause
7+ //
8+ // Unless required by applicable law or agreed to in writing, software distributed
9+ // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10+ // CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
11+ // language governing permissions and limitations under the License.
12+ //
13+
14+ #include < arpa/inet.h>
15+ #include < errno.h>
16+ #include < netinet/in.h>
17+ #include < signal.h>
18+ #include < stdlib.h>
19+ #include < string.h>
20+ #include < sys/socket.h>
21+ #include < unistd.h>
22+
23+ #include < chrono>
24+ #include < future>
25+ #include < iostream>
26+ #include < memory>
27+ #include < string>
28+ #include < thread>
29+
30+ #include " polaris/consumer.h"
31+
32+ class ConsumerServer {
33+ public:
34+ ConsumerServer (const std::string& host, int port, const polaris::ServiceKey& provider_service);
35+
36+ ~ConsumerServer ();
37+
38+ int Start ();
39+
40+ void Stop ();
41+
42+ private:
43+ std::string Proccess (const std::string& message);
44+
45+ int Send (const std::string& host, int port, const std::string& request, std::string& response);
46+
47+ private:
48+ std::string host_;
49+ int port_;
50+ polaris::ServiceKey provider_service_;
51+
52+ std::atomic<bool > stop_;
53+ std::unique_ptr<std::thread> accept_thread_;
54+
55+ std::unique_ptr<polaris::ConsumerApi> consumer_;
56+ };
57+
58+ bool signal_received = false ;
59+ void SignalHandler (int signum) {
60+ std::cout << " Interrupt signal (" << signum << " ) received." << std::endl;
61+ signal_received = true ;
62+ }
63+
64+ int main (int argc, char ** argv) {
65+ if (argc < 5 ) {
66+ std::cout << " usage: " << argv[0 ] << " host port service_namespace service_name" << std::endl;
67+ return -1 ;
68+ }
69+ // register signal handler
70+ signal (SIGINT, SignalHandler);
71+
72+ polaris::ServiceKey service_key = {argv[3 ], argv[4 ]};
73+ ConsumerServer server (argv[1 ], atoi (argv[2 ]), service_key);
74+
75+ // 启动服务
76+ if (server.Start () != 0 ) {
77+ return -2 ;
78+ }
79+
80+ // 循环等待退出信号
81+ while (!signal_received) {
82+ sleep (1 );
83+ }
84+
85+ // 反注册完成以后再停止服务
86+ server.Stop ();
87+
88+ return 0 ;
89+ }
90+
91+ ConsumerServer::ConsumerServer (const std::string& host, int port,
92+ const polaris::ServiceKey& provider_service)
93+ : host_(host), port_(port), provider_service_(provider_service), stop_(false ) {
94+ consumer_ = std::unique_ptr<polaris::ConsumerApi>(polaris::ConsumerApi::CreateWithDefaultFile ());
95+ }
96+
97+ ConsumerServer::~ConsumerServer () {}
98+
99+ int ConsumerServer::Start () {
100+ auto sock_listener = socket (AF_INET, SOCK_STREAM, 0 );
101+ if (sock_listener < 0 ) {
102+ std::cerr << " create socket with error: " << errno << std::endl;
103+ return -1 ;
104+ }
105+
106+ // address info to bind socket
107+ sockaddr_in server_addr;
108+ server_addr.sin_family = AF_INET;
109+ inet_pton (AF_INET, host_.c_str (), &server_addr.sin_addr );
110+ server_addr.sin_port = htons (port_);
111+
112+ // bind socket
113+ if (bind (sock_listener, (sockaddr*)&server_addr, sizeof (server_addr)) < 0 ) {
114+ std::cerr << " bind to " << host_ << " :" << port_ << " failed with error: " << errno
115+ << std::endl;
116+ close (sock_listener);
117+ return -2 ;
118+ }
119+
120+ // start listening
121+ if (listen (sock_listener, SOMAXCONN) < 0 ) {
122+ std::cerr << " listen to " << host_ << " :" << port_ << " failed with error: " << errno
123+ << std::endl;
124+ close (sock_listener);
125+ return -3 ;
126+ }
127+ std::cout << " listen to " << host_ << " :" << port_ << " success" << std::endl;
128+
129+ // create accept thread
130+ accept_thread_ = std::unique_ptr<std::thread>(new std::thread ([=] {
131+ while (!stop_) {
132+ fd_set set;
133+ FD_ZERO (&set);
134+ FD_SET (sock_listener, &set);
135+ struct timeval timeout;
136+ timeout.tv_sec = 2 ;
137+ timeout.tv_usec = 0 ;
138+ int ret = select (sock_listener + 1 , &set, NULL , NULL , &timeout);
139+ if (ret <= 0 ) {
140+ continue ;
141+ }
142+ sockaddr_in client_addr;
143+ socklen_t client_addr_size = sizeof (client_addr);
144+ int sock_client;
145+ if ((sock_client = accept (sock_listener, (sockaddr*)&client_addr, &client_addr_size)) < 0 ) {
146+ std::cerr << " accept connection failed with error:" << errno << std::endl;
147+ continue ;
148+ }
149+
150+ // 处理客户端连接
151+ std::async (std::launch::async, [=] {
152+ char buffer[1024 ];
153+ auto bytes = recv (sock_client, buffer, sizeof (buffer), 0 );
154+ if (bytes <= 0 ) {
155+ std::cerr << " received message failed: " << errno << std::endl;
156+ close (sock_client);
157+ return ;
158+ }
159+ std::string response = Proccess (buffer);
160+ bytes = send (sock_client, response.data (), response.size (), 0 );
161+ close (sock_client);
162+
163+ if (bytes < 0 ) {
164+ std::cerr << " send response failed: " << errno << std::endl;
165+ }
166+ });
167+ }
168+ close (sock_listener);
169+ }));
170+
171+ return 0 ;
172+ }
173+
174+ std::string ConsumerServer::Proccess (const std::string& message) {
175+ // 获取provider服务实例
176+ polaris::GetOneInstanceRequest instance_requst (provider_service_);
177+ polaris::Instance instance;
178+ auto ret_code = consumer_->GetOneInstance (instance_requst, instance);
179+ if (ret_code != polaris::kReturnOk ) {
180+ std::cout << " get one instance for service with error: "
181+ << polaris::ReturnCodeToMsg (ret_code).c_str () << std::endl;
182+ }
183+
184+ // 调用业务
185+ std::string response;
186+ auto begin_time = std::chrono::steady_clock::now ();
187+ int send_ret = Send (instance.GetHost (), instance.GetPort (), message, response);
188+ auto end_time = std::chrono::steady_clock::now ();
189+
190+ // 上报调用结果
191+ polaris::ServiceCallResult result;
192+ result.SetServiceNamespace (provider_service_.namespace_ );
193+ result.SetServiceName (provider_service_.name_ );
194+ result.SetInstanceId (instance.GetId ());
195+ result.SetDelay (
196+ std::chrono::duration_cast<std::chrono::milliseconds>(end_time - begin_time).count ());
197+ result.SetRetCode (send_ret);
198+ result.SetRetStatus (send_ret >= 0 ? polaris::kCallRetOk : polaris::kCallRetError );
199+ if ((ret_code = consumer_->UpdateServiceCallResult (result)) != polaris::kReturnOk ) {
200+ std::cout << " update call result for instance with error:"
201+ << " msg:" << polaris::ReturnCodeToMsg (ret_code).c_str () << std::endl;
202+ }
203+
204+ if (send_ret) {
205+ response =
206+ " send msg to " + instance.GetHost () + " :" + std::to_string (instance.GetPort ()) + " failed" ;
207+ }
208+ std::cout << response << std::endl;
209+ return response;
210+ }
211+
212+ int ConsumerServer::Send (const std::string& host, int port, const std::string& request,
213+ std::string& response) {
214+ // create a socket
215+ int sock_fd = socket (AF_INET, SOCK_STREAM, 0 );
216+ if (sock_fd < 0 ) {
217+ std::cout << " create socket failed: " << errno << std::endl;
218+ return -1 ;
219+ }
220+
221+ sockaddr_in server_addr;
222+ server_addr.sin_family = AF_INET;
223+ inet_pton (AF_INET, host.c_str (), &server_addr.sin_addr );
224+ server_addr.sin_port = htons (port);
225+
226+ if (connect (sock_fd, (sockaddr*)&server_addr, sizeof (server_addr)) < 0 ) {
227+ std::cerr << " connection establish failed: " << errno << std::endl;
228+ close (sock_fd);
229+ return -2 ;
230+ }
231+
232+ // send the message
233+ int bytes_send = send (sock_fd, request.data (), request.length (), 0 );
234+ if (bytes_send < 0 ) {
235+ std::cerr << " send message failed: " << errno << std::endl;
236+ close (sock_fd);
237+ return -3 ;
238+ }
239+
240+ char buffer[4096 ];
241+ int bytes_recv = recv (sock_fd, &buffer, sizeof (buffer), 0 );
242+ if (bytes_recv <= 0 ) {
243+ std::cerr << " receive message failed: " << errno << std::endl;
244+ close (sock_fd);
245+ return -4 ;
246+ }
247+
248+ close (sock_fd);
249+ response = std::string (buffer);
250+ return 0 ;
251+ }
252+
253+ void ConsumerServer::Stop () {
254+ stop_ = true ;
255+ if (accept_thread_) {
256+ accept_thread_->join ();
257+ }
258+ }
0 commit comments