diff --git a/Code/Server/.gitignore b/Code/Server/.gitignore new file mode 100644 index 00000000..76da0017 --- /dev/null +++ b/Code/Server/.gitignore @@ -0,0 +1 @@ +__pycache__/** diff --git a/Code/Server/input_navigation.py b/Code/Server/input_navigation.py new file mode 100644 index 00000000..b66416bb --- /dev/null +++ b/Code/Server/input_navigation.py @@ -0,0 +1,176 @@ +#!/usr/bin/python3 +import argparse +import os +import platform +import sys + +if platform.system() != 'Linux': + print(f'The program does NOT run on {platform.system()}') + print('It requires evdev which is only available for Linux') + print('Exiting.') + exit(1) + +from evdev import InputDevice, categorize, ecodes +from Motor import * +from Buzzer import * + + +""" +https://core-electronics.com.au/tutorials/using-usb-and-bluetooth-controllers-with-python.html +https://python-evdev.readthedocs.io/en/latest/usage.html -- keyboard +$ python +Python 3.7.3 (default, Jul 25 2020, 13:03:44) +[GCC 8.3.0] on linux +>>> from evdev import ecodes +>>> ecodes.KEY_A +30 +>>> ecodes.KEY[103] +'KEY_UP' +""" + +def main(args): + """ + CLI program to navigate using a keyboard or controller + ex: + device /dev/input/event3, name "Dell Dell USB Keyboard", phys "usb-0000:01:00.0-1.1/input0" + device /dev/input/event2, name "Logitech K400", phys "usb-0000:01:00.0-1.1/input2:1" + device /dev/input/event1, name "Sony PLAYSTATION(R)3 Controller", phys "usb-0000:01:00.0-1.4/input0" + device /dev/input/event0, name "Sony PLAYSTATION(R)3 Controller Motion Sensors", phys "usb-0000:01:00.0-1.4/input0" + """ + cli = argparse.ArgumentParser(description="Navigate the Freenove robot with different inputs") + cli.add_argument('-c', '--controller', action="store_true", help="use a gamepad controller for input") + cli.add_argument('-k', '--keyboard', action="store_true", help="use the keyboard for input") + cli.add_argument('-i', '--interactive', action="store_true", help="interactively select the input device") + cli.add_argument('-d', '--debug', action="store_true", help="print debug output, use with '-i' to setup \ + a new device") + cli.add_argument('-v', '--verbose', action="count", default=0, help="print verbose output, \ + allows counting of verbosity (-vvv)") + + if len(sys.argv) == 1: + print() + cli.print_help() + print() + exit(3) + + opts = cli.parse_args(args) + debug = opts.debug + verbose = opts.verbose + interactive = opts.interactive + if opts.keyboard: + input_devices = ['Logitech K400', 'Dell Dell USB Keyboard'] + if opts.controller: + input_devices = ['Sony PLAYSTATION(R)3 Controller', '8Bitdo Zero GamePad'] + + input_dev = '' + event_files = os.listdir('/dev/input/') + for event in event_files: + if 'event' in event: + tmp = InputDevice(f'/dev/input/{event}') # the keyboards and/or controllers + if verbose >= 1 or interactive: + print(f'Found: {tmp}') + + if interactive: + answer = input("Would you like to use this input device? (y/n)\n") + if answer.lower() == 'yes' or answer.lower() == 'y': + input_dev = tmp + break + else: + for input_device in input_devices: + if input_device == tmp.name: + input_dev = tmp + print(f'Using {input_dev.name} for input') + break + + if not input_dev: + print('No input match found. Exiting.') + exit(1) + + if not debug: # used for setting up a new device, skip definitions below + # button code variables (add/change to suit your device) + if 'Sony PLAYSTATION(R)3 Controller' == input_dev.name: + aBtn = 305 + bBtn = 304 + xBtn = 307 + yBtn = 308 + + up = 544 + down = 545 + left = 546 + right = 547 + + select = 314 + start = 315 + PSBtn = enter = 316 + + r1 = 311 + r2 = 313 + l1 = 310 + l2 = 312 + + rjstick = 318 + ljstick = 317 + elif '8Bitdo Zero GamePad' == input_dev.name: + aBtn = 34 + bBtn = 36 + xBtn = 35 + yBtn = 23 + + up = 46 + down = 32 + left = 18 + right = 33 + + start = 24 + select = 49 + + l1 = 37 + r1 = 50 + elif 'Logitech K400' == input_dev.name or 'Dell Dell USB Keyboard' == input_dev.name: + up = 103 + down = 108 + left = 105 + right = 106 + + enter = 28 + else: + print('No valid input device found. Exiting.') + exit(1) + + PWM=Motor() + buzzer = Buzzer() + + try: + #evdev takes care of polling the controller in a loop + for event in input_dev.read_loop(): + if event.type == ecodes.EV_KEY: # filters by event type + if verbose == 1 or debug: + print(event) + + if not debug: + if event.value == 0: + PWM.setMotorModel(0,0,0,0) + buzzer.run('0') + elif event.value == 1: + if event.code == up: + PWM.setMotorModel(2000,2000,2000,2000) + elif event.code == down: + PWM.setMotorModel(-2000,-2000,-2000,-2000) + elif event.code == left: + PWM.setMotorModel(-2000,-2000,2000,2000) + elif event.code == right: + PWM.setMotorModel(2000,2000,-2000,-2000) + elif event.code == enter: + buzzer.run('1') + + if verbose == 2: + print(categorize(event)) # more verbose output + + except KeyboardInterrupt: + print(' Exiting.') + +if __name__ == "__main__": + try: + main(sys.argv[1:]) + except KeyboardInterrupt: + print(' Exiting.') + diff --git a/Code/Server/main.py b/Code/Server/main.py index 6a94df7c..3d98a160 100644 --- a/Code/Server/main.py +++ b/Code/Server/main.py @@ -22,6 +22,7 @@ def __init__(self): self.start_tcp=False self.TCP_Server=Server() self.parseOpt() + if self.user_ui: self.app = QApplication(sys.argv) super(mywindow,self).__init__() @@ -34,8 +35,9 @@ def __init__(self): self.Button_Server.clicked.connect(self.on_pushButton) self.pushButton_Close.clicked.connect(self.close) self.pushButton_Min.clicked.connect(self.windowMinimumed) - + if self.start_tcp: + print ("Starting TCP Server") self.TCP_Server.StartTcpServer() self.ReadData=Thread(target=self.TCP_Server.readdata) self.SendVideo=Thread(target=self.TCP_Server.sendvideo) @@ -46,9 +48,12 @@ def __init__(self): if self.user_ui: self.label.setText("Server On") self.Button_Server.setText("Off") - + + print("Ready, waiting for commands...") + def windowMinimumed(self): self.showMinimized() + def mousePressEvent(self, event): if event.button()==Qt.LeftButton: self.m_drag=True @@ -66,11 +71,11 @@ def mouseReleaseEvent(self, QMouseEvent): def parseOpt(self): self.opts,self.args = getopt.getopt(sys.argv[1:],"tn") for o,a in self.opts: - if o in ('-t'): - print ("Open TCP") - self.start_tcp=True - elif o in ('-n'): + if '-n' in o: self.user_ui=False + + if '-t' in o: + self.start_tcp=True def close(self): try: @@ -80,21 +85,24 @@ def close(self): except: pass try: + print ("Stopping TCP Server") self.TCP_Server.server_socket.shutdown(2) self.TCP_Server.server_socket1.shutdown(2) self.TCP_Server.StopTcpServer() except: pass - print ("Close TCP") + if self.user_ui: QCoreApplication.instance().quit() + + print("Exiting.\n") os._exit(0) + def on_pushButton(self): if self.label.text()=="Server Off": self.label.setText("Server On") self.Button_Server.setText("Off") - self.TCP_Server.tcp_Flag = True - print ("Open TCP") + print ("Starting TCP Server") self.TCP_Server.StartTcpServer() self.SendVideo=Thread(target=self.TCP_Server.sendvideo) self.ReadData=Thread(target=self.TCP_Server.readdata) @@ -106,23 +114,26 @@ def on_pushButton(self): elif self.label.text()=='Server On': self.label.setText("Server Off") self.Button_Server.setText("On") - self.TCP_Server.tcp_Flag = False try: stop_thread(self.ReadData) stop_thread(self.power) stop_thread(self.SendVideo) except: pass - self.TCP_Server.StopTcpServer() - print ("Close TCP") + print ("Stopping TCP Server") + self.TCP_Server.StopTcpServer() + + if __name__ == '__main__': myshow=mywindow() - if myshow.user_ui==True: - myshow.show(); + if myshow.user_ui: + myshow.show() sys.exit(myshow.app.exec_()) else: try: - pass + while(True): + pass except KeyboardInterrupt: + print(" caught ctrl-c") myshow.close() diff --git a/Code/Server/server.py b/Code/Server/server.py index 68be22d5..ac60da28 100644 --- a/Code/Server/server.py +++ b/Code/Server/server.py @@ -6,7 +6,7 @@ import time import picamera import fcntl -import sys +import sys import threading from Motor import * from servo import * @@ -31,19 +31,22 @@ def __init__(self): self.adc=Adc() self.light=Light() self.infrared=Line_Tracking() - self.tcp_Flag = True + self.tcp_Flag=True self.sonic=False self.Light=False self.Mode = 'one' self.endChar='\n' self.intervalChar='#' + def get_interface_ip(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s',b'wlan0'[:15]) )[20:24]) + def StartTcpServer(self): + self.tcp_Flag=True HOST=str(self.get_interface_ip()) self.server_socket1 = socket.socket() self.server_socket1.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEPORT,1) @@ -55,14 +58,18 @@ def StartTcpServer(self): self.server_socket.listen(1) print('Server address: '+HOST) - def StopTcpServer(self): + self.tcp_Flag=False try: self.connection.close() self.connection1.close() except Exception as e: - print ('\n'+"No client connection") - + print ("No client connection") + + self.led.colorWipe(led.strip, Color(0,0,0),10) # clear the leds if they were selected + self.servo.setServoPwm('0',90) # return the servos to their starting position + self.servo.setServoPwm('1',90) + def Reset(self): self.StopTcpServer() self.StartTcpServer() @@ -70,8 +77,10 @@ def Reset(self): self.ReadData=Thread(target=self.readdata) self.SendVideo.start() self.ReadData.start() + def send(self,data): - self.connection1.send(data.encode('utf-8')) + self.connection1.send(data.encode('utf-8')) + def sendvideo(self): try: self.connection,self.client_address = self.server_socket.accept() @@ -81,20 +90,20 @@ def sendvideo(self): self.server_socket.close() try: with picamera.PiCamera() as camera: - camera.resolution = (400,300) # pi camera resolution + camera.resolution = (400,300) # pi camera resolution camera.framerate = 15 # 15 frames/sec time.sleep(2) # give 2 secs for camera to initilize start = time.time() stream = io.BytesIO() # send jpeg format video stream - print ("Start transmit ... ") + print ("Start video transmit ... ") for foo in camera.capture_continuous(stream, 'jpeg', use_video_port = True): try: self.connection.flush() stream.seek(0) b = stream.read() length=len(b) - if length >5120000: + if length > 5120000: continue lengthBin = struct.pack('L', length) self.connection.write(lengthBin) @@ -103,7 +112,7 @@ def sendvideo(self): stream.truncate() except Exception as e: print(e) - print ("End transmit ... " ) + print ("End video transmit ... " ) break except: #print "Camera unintall" @@ -132,17 +141,16 @@ def readdata(self): try: try: self.connection1,self.client_address1 = self.server_socket1.accept() - print ("Client connection successful !") + print ("Client connection successful!") except: - print ("Client connect failed") + print ("Client connect failed!") restCmd="" self.server_socket1.close() while True: try: AllData=restCmd+self.connection1.recv(1024).decode('utf-8') except: - if self.tcp_Flag: - self.Reset() + self.Reset() break print(AllData) if len(AllData) < 5: @@ -260,7 +268,8 @@ def readdata(self): pass except Exception as e: print(e) - self.StopTcpServer() + self.StopTcpServer() + def sendUltrasonic(self): if self.sonic==True: ADC_Ultrasonic=self.ultrasonic.get_distance() @@ -271,6 +280,7 @@ def sendUltrasonic(self): self.sonic=False self.ultrasonicTimer = threading.Timer(0.13,self.sendUltrasonic) self.ultrasonicTimer.start() + def sendLight(self): if self.Light==True: ADC_Light1=self.adc.recvADC(0) @@ -281,6 +291,7 @@ def sendLight(self): self.Light=False self.lightTimer = threading.Timer(0.17,self.sendLight) self.lightTimer.start() + def Power(self): while True: ADC_Power=self.adc.recvADC(2)*3 @@ -299,5 +310,8 @@ def Power(self): time.sleep(0.1) else: self.buzzer.run('0') + + if __name__=='__main__': - pass + print("This file isn't meant to be invoked on the cli") + print("Please run 'sudo python main.py' instead.") diff --git a/Code/Server/test.py b/Code/Server/test.py index fc589691..b82c3588 100644 --- a/Code/Server/test.py +++ b/Code/Server/test.py @@ -10,11 +10,10 @@ def test_Led(): led.ledIndex(0x10,0,255,255) #cyan-blue led.ledIndex(0x20,0,0,255) #blue led.ledIndex(0x40,128,0,128) #purple - led.ledIndex(0x80,255,255,255) #white''' - print ("The LED has been lit, the color is red orange yellow green cyan-blue blue white") - time.sleep(3) #wait 3s - led.colorWipe(led.strip, Color(0,0,0)) #turn off the light - print ("\nEnd of program") + led.ledIndex(0x80,255,255,255) #white + print ("The LED has been lit, the color is red orange yellow green cyan-blue blue purple white") + while True: + time.sleep(3) #wait 3s except KeyboardInterrupt: led.colorWipe(led.strip, Color(0,0,0)) #turn off the light print ("\nEnd of program")