-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
134 lines (112 loc) · 4.54 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python3
"""
Simple application using the EscPosDecoder ESC/POS binary decoder class in its
processing loop.
The application opens a TCP/IP port to listen for incoming data.
The incoming data is fed to the decoder.
After no more data has been received, the data is decoded as whole block.
The whole data is re-transmit to the real printer via TCP/IP.
Furthermore, the decoded text is encoded as JSON message and
(TODO) forwarded to a webserver for further processing.
"""
import sys
import socket
import json
import argparse
from esc_pos_decoder import EscPosDecoder
def forward_to_printer(printer_hostname="printer", printer_port=9001, raw_data=b""):
completed = False
try:
# Create a TCP/IP socket
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tx_sock.settimeout(3)
print(f"Connecting to real printer on {printer_hostname}:{printer_port}")
tx_sock.connect((printer_hostname, printer_port))
# Send data
print(f"Sending message '{raw_data}'")
tx_sock.sendall(raw_data)
completed = True
except ConnectionRefusedError:
print("Connection to printer refused")
pass
except socket.gaierror:
print("Connection to printed failed")
pass
finally:
print("Closing socket")
tx_sock.close()
return completed
def main(args):
parser = argparse.ArgumentParser()
parser.add_argument('-lp', '--listener-port', type=int, default=9100)
parser.add_argument('-ph', '--printer-hostname', type=str, default="printer")
parser.add_argument('-pp', '--printer-port', type=int, default=9100)
parser.add_argument('-v', '--verbose', action='count', default=0)
args = parser.parse_args()
print(f"listener port: {args.listener_port}")
print(f"printer hostname: {args.printer_hostname}")
print(f"printer port: {args.printer_port}")
# Create a TCP/IP socket
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_hostname = "0.0.0.0" # public interface ("localhost" would be limited to local interface)
server_port = args.listener_port # raw networking printing protocol
# Bind the socket to the port
print(f"Starting webserver on {server_hostname}:{server_port}")
rx_sock.bind((server_hostname, server_port))
# Listen for incoming connections
rx_sock.listen(1)
while True:
# Wait for a TCP/IP connection
print("Waiting for connection")
connection, client_address = rx_sock.accept()
tx_message = {
"decoder_status": "unknown",
"printer_status": "unknown",
"receipt_content": {
"lines": ""
}
}
# Connection has been established
try:
print(f"Connection from {client_address[0]}:{client_address[1]}")
decoder = EscPosDecoder(args.verbose)
print(f"Decoder: {decoder}")
raw_data = b""
while True:
raw_data_chunk = connection.recv(16)
if raw_data_chunk:
# print(f"Received data: {raw_data_chunk}")
decoder.feed_bytes(raw_data_chunk)
raw_data = raw_data + raw_data_chunk
else:
print(f"No more data from {client_address[0]}:{client_address[1]}")
break
print("Out of decoding loop.")
# terminate decoder/parser
receipt_text = decoder.get_text()
print("-"*48)
print(receipt_text)
print("-"*48)
# forward data to the real printer and add status info
ret = forward_to_printer(printer_hostname=args.printer_hostname,
printer_port=args.printer_port,
raw_data=raw_data)
if ret:
print("Sent to printer")
tx_message["printer_status"] = "success"
else:
print("Failed to send to printer")
tx_message["printer_status"] = "error"
# parse the decoded receipt content further
tx_message["receipt_content"]["lines"] = receipt_text.strip().splitlines()
# TODO: send the JSON data to the webserver
print(json.dumps(tx_message, indent=4))
finally:
# Clean up the connection
connection.close()
if decoder:
print("Deleting decoder object")
del decoder
if __name__ == '__main__':
import sys
main(sys.argv[1:])