forked from wengjjpaul/PythonSimpleDDS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfgcg
88 lines (88 loc) · 4.83 KB
/
fgcg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
[1mdiff --git a/PythonSimpleDDS/PythonSimpleDDS/PublisherService.py b/PythonSimpleDDS/PythonSimpleDDS/PublisherService.py[m
[1mindex 4a1c160..0260eb1 100644[m
[1m--- a/PythonSimpleDDS/PythonSimpleDDS/PublisherService.py[m
[1m+++ b/PythonSimpleDDS/PythonSimpleDDS/PublisherService.py[m
[36m@@ -7,11 +7,13 @@[m [mclass PublisherService:[m
self.port = aHostPort[m
self.mDDSFilter = aDDSFilter[m
def HostPublisherService(self):[m
[32m+[m[32m #Setup listening socket to hear from publishers[m
self.mServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)[m
self.mServer.bind(('', self.port))[m
self._startListening()[m
def _startListening(self):[m
while 1:[m
[32m+[m[32m #Incoming messages stored in message[m
message, address = self.mServer.recvfrom(8192)[m
messageSendFromClient = message.decode("utf-8")[m
messageParts = messageSendFromClient.split(',')[m
[36m@@ -20,13 +22,26 @@[m [mclass PublisherService:[m
if(tCommand):[m
if(tCommand == "publish"):[m
if(tTopic):[m
[31m- #remove command and topic from message[m
[32m+[m[32m #remove command and topic segment from message array[m
messageParts = messageParts[2:][m
#Make a comma separated string from list[m
messageToSend = ",".join(messageParts)[m
[32m+[m[32m #Clean up the message little[m
messageToSend.strip()[m
[32m+[m[32m #Get subscriber list for current topic[m
tSubscriberListForThisTopic = self.mDDSFilter.getSubscribers(tTopic)[m
[32m+[m[32m<<<<<<< HEAD[m
[32m+[m[32m<<<<<<< HEAD[m
[32m+[m[32m<<<<<<< HEAD[m
t = threading.Thread(target=self._publish, args = (self.mServer, messageToSend, tSubscriberListForThisTopic))[m
[32m+[m[32m=======[m
[32m+[m[32m=======[m
[32m+[m[32m>>>>>>> 76dff2824c2130a2b57b24ad469fea286f49c6bd[m
[32m+[m[32m=======[m
[32m+[m[32m>>>>>>> 76dff2824c2130a2b57b24ad469fea286f49c6bd[m
[32m+[m[32m #Begin thread to publish to all subscribers[m
[32m+[m[32m t = threading.Thread(target=_publish, args = (self.mServer, messageToSend, tSubscriberListForThisTopic))[m
[32m+[m[32m>>>>>>> 76dff2824c2130a2b57b24ad469fea286f49c6bd[m
#wont keep the thread up if main thread die[m
t.daemon = True[m
try:[m
[36m@@ -36,7 +51,10 @@[m [mclass PublisherService:[m
else:[m
pass[m
def _publish(self, aServer, aMessage, aSubscriberList):[m
[32m+[m[32m #Convert String message into bytes[m
tMessageInBytes = bytes(aMessage, 'UTF-8')[m
[32m+[m[32m #Check if there are any subscribers[m
if(aSubscriberList):[m
for tSubscriber in aSubscriberList:[m
[32m+[m[32m #Send out the message using UDP using listening socket[m
aServer.sendto(tMessageInBytes, tSubscriber)[m
[1mdiff --git a/PythonSimpleDDS/PythonSimpleDDS/SubscribeService.py b/PythonSimpleDDS/PythonSimpleDDS/SubscribeService.py[m
[1mindex 64ef539..aad77bf 100644[m
[1m--- a/PythonSimpleDDS/PythonSimpleDDS/SubscribeService.py[m
[1m+++ b/PythonSimpleDDS/PythonSimpleDDS/SubscribeService.py[m
[36m@@ -2,21 +2,26 @@[m [mimport socket[m
[m
class SubscribeService:[m
def __init__(self, aDDSFilter, aHostIPV4, aHostPort):[m
[32m+[m[32m #Create a listening socket for SubscribeService[m
self.mServer = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)[m
self.host = aHostIPV4[m
self.port = aHostPort[m
self.mDDSFilter = aDDSFilter[m
[32m+[m[32m #Debug function[m
def _testFilter(self, what):[m
print(self.mDDSFilter.getSubscribers(what))[m
def hostSubscribeService(self):[m
self.mServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)[m
[32m+[m[32m #Bind to a single port for listening[m
self.mServer.bind((self.host, self.port))[m
self._startListening()[m
def _startListening(self):[m
while 1:[m
[32m+[m[32m #Incoming message stored in message[m
message, address = self.mServer.recvfrom(8192)[m
messageSendFromClient = message.decode("utf-8")[m
messageParts = messageSendFromClient.split(',')[m
[32m+[m[32m #Clean up the message[m
stripedMessageParts = [item.strip() for item in messageParts][m
if(stripedMessageParts):[m
if(stripedMessageParts[0] == "subscribe"):[m