-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFISH2_functions.py
2467 lines (2205 loc) · 128 KB
/
FISH2_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#Python 3 package to perform and experiment on ROBOFISH.
#Date: 03 April 2017
#Author: Lars E. Borm
#E-mail: [email protected] or [email protected]
#Python version: 3.5.1
## CONTENT ##
# Hardware address
# Class initiation
# Experimental parameters management
# Hardware management
# System calibration (utility functions)
# Communication with user with push messages
# Fluid tracking (update buffers, notify user)
# Fluid handling, Lower level functions
# Function decorator
# Fluid handling, Higher level functions
# Temperature control
# Imaging functions
# Error checking
# Scheduler
################################################################################
# DEPENDENCIES
import serial
from serial.tools import list_ports
from collections import deque
import time
from datetime import datetime, timedelta
import numpy as np
from functools import wraps
from tkinter import *
import pickle
import shutil
# FISH peripherals
import FISH2_peripherals as perif
import sqlite3
# HARDWARE
# Syringe Pump
import tecanapi
import transport
import syringe
import models
#Syringe pump models
try:
from models import XE1000
except ModuleNotFoundError:
print('Module XE1000 not found. Ignore if not in use.')
try:
from models import XCalibur
except ModuleNotFoundError:
print('Module XCalibur not found. Ignore if not in use.')
## MX Valve
import MXII_valve
## Thermo Cube
try:
import ThermoCube
except ModuleNotFoundError:
print('Module ThermoCube not found. Ignore if machine is not in use.')
## Oasis
try:
import Oasis
except ModuleNotFoundError:
print('Module Oasis not found. Ignore if machine is not in use.')
## Yoctopuce Thermistor
try:
import YoctoThermistor_FISH
except ModuleNotFoundError:
print('Module YoctoThermisto_FISH not found. Ignore if machine is not in use.')
## TC-720 temperature controller
try:
import Py_TC720
except ModuleNotFoundError:
print('Module Py_TC720 not found. Ignore if machine is not in use.')
#=============================================================================
# Hardware address
#=============================================================================
def find_address(identifier = None):
"""
Find the address of a serial device. It can either find the address using
an identifier given by the user or by manually unplugging and plugging in
the device.
Input:
`identifier`(str): Any attribute of the connection. Usually USB to Serial
converters use an FTDI chip. These chips store a number of attributes
like: name, serial number or manufacturer. This can be used to
identify a serial connection as long as it is unique. See the pyserial
list_ports.grep() function for more details.
Returns:
The function prints the address and serial number of the FTDI chip.
`port`(obj): Returns a pyserial port object. port.device stores the
address.
"""
found = False
if identifier != None:
port = [i for i in list(list_ports.grep(identifier))]
if len(port) == 1:
print('Device address: {}'.format(port[0].device))
found = True
elif len(port) == 0:
print('''No devices found using identifier: {}
\nContinue with manually finding USB address...\n'''.format(identifier))
else:
print('{:15}| {:15} |{:15} |{:15} |{:15}'.format('Device', 'Name', 'Serial number', 'Manufacturer', 'Description') )
for p in port:
print('{:15}| {:15} |{:15} |{:15} |{:15}\n'.format(str(p.device), str(p.name), str(p.serial_number), str(p.manufacturer), str(p.description)))
raise Exception("""The input returned multiple devices, see above.""")
if found == False:
print('Performing manual USB address search.')
while True:
input(' Unplug the USB. Press Enter if unplugged...')
before = list_ports.comports()
input(' Plug in the USB. Press Enter if USB has been plugged in...')
after = list_ports.comports()
port = [i for i in after if i not in before]
if port != []:
break
print(' No port found. Try again.\n')
while True:
awnser = input(' Do you want to try again? Y/N')
if awnser.lower() == 'y' or awnser.lower() == 'n':
break
else:
print(' Invalid awnser, type Y for Yes, or N for No')
if awnser == 'n':
return
print('Device address: {}'.format(port[0].device))
try:
print('Device serial_number: {}'.format(port[0].serial_number))
except Exception:
print('Could not find serial number of device.')
return port[0]
#=============================================================================
# Initiation
#=============================================================================
class FISH2():
"""
Class containing all functions for performing the experiment
(see: FISH2_peripherals.py for complementing funcitons)
"""
def __init__(self, db_path, imaging_output_folder, start_imaging_file_path, system_name='ROBOFISH'):
"""
Initiate system
Input:
`db_path`(str): Path to the database. Or new name if it does not exist.
Suggested name: "FISH_System2_db.sqlite".
`imaging_output_folder`(str): Path to where the images are saved.
"""
self.system_name = system_name
self.L = perif.FISH_logger(system_name = self.system_name)
#Some check if it does not exist yet???
self.db_path = db_path
print(self.db_path)
self.imaging_output_folder = imaging_output_folder
self.start_imaging_file_path = start_imaging_file_path
self.Parameters = {}
self.Volumes = {}
self.Targets = {}
self.Ports = {}
self.Ports_reverse = {}
self.Hybmix = {}
self.devices = []
self.Machines = {}
self.Machine_identification = {}
self.Fixed_USB_port = {}
self.Operator_address = {}
self.Padding = {}
self.Alert_volume = {}
self.target_temperature= [None, None]
self.found_error = False
self.L.logger.info(f'System name: {system_name}')
self.L.logger.info(f'Path to database: {self.db_path}')
self.L.logger.info(f'Path to imaging_output_folder: {imaging_output_folder}')
self.L.logger.info(f'Path to start_imaging_file_path: {start_imaging_file_path}')
#Make sure imaging start file is set to 0
with open(self.start_imaging_file_path, 'w') as start_imaging_file:
start_imaging_file.write('0')
self.L.logger.info('Set "start_imaging_file" to zero.')
# Ask user if hardware can be initialized and primed.
input('''\nPress Enter if:
* Flowcell(s) is placed in stage and connected.
* Heating/cooling cables are connected.
* All buffers are in the correct position and connected.
* System is connected.
* Valves are open.
* Datafile is committed to the database using the user program.
* Optional temperature sensor is connected.\n''')
self.L.logger.info('User confirmed system is ready for operation.')
# Get all data from database
self.L.logger.info('Retrieving data from database.')
self.updateExperimentalParameters(self.db_path, ignore_flags=True)
self.L.logger.info('Successful retrieved data from database.')
#Connecting to hardware
self.L.logger.info('Connecting and initiating hardware.')
#Find addresses
self.L.logger.info(' Finding device addresses.')
device_COM_port = self.deviceAddress()
#Initiate all connected devices, connected is determined by the user
# initialize MXValve1
if self.Machines['MXValve1'] == 1:
if device_COM_port['MXValve1'] == None:
self.L.logger.warning(' MXValve1 not connected (no address available).')
else:
self.MXValve1 = MXII_valve.MX_valve(address = device_COM_port['MXValve1'], ports=10, name = 'MXValve1', verbose = False)
self.L.logger.info(' MXValve1 Initialized.')
self.devices.append('MXValve1')
else:
self.L.logger.info(' MXValve1 not connected according to user. Ignore if not needed.')
# initialize MXValve2
if self.Machines['MXValve2'] == 1:
if device_COM_port['MXValve2'] == None:
self.L.logger.warning(' MXValve2 not connected (no address available).')
else:
self.MXValve2 = MXII_valve.MX_valve(address = device_COM_port['MXValve2'], ports=10, name = 'MXValve2', verbose = False)
self.L.logger.info(' MXValve2 Initialized.')
self.devices.append('MXValve2')
else:
self.L.logger.info(' MXValve2 not connected according to user. Ignore if not needed.')
# initialize Temperature Controller 1 (TC_1)
if self.Machines['ThermoCube1'] == 1:
if device_COM_port['ThermoCube1'] == None:
self.L.logger.warning(' ThermoCube_1 not connected (no address available).')
else:
self.TC_1 = ThermoCube.ThermoCube(address = device_COM_port['ThermoCube1'],
name = 'ThermoCube_1')
if self.TC_1.connected == True:
self.L.logger.info(' Temperature controller TC_1 Initialized.')
self.setTemp(20, 'Chamber1')
self.devices.append('ThermoCube1')
else:
self.L.logger.warning(' Could not make a connection with ThermoCube_1.')
else:
self.L.logger.info(' ThermoCube1 not connected according to user. Ignore if not needed.')
# initialize Temperature Controller 2 (TC_2) Replace this with your temperature controller if needed.
if self.Machines['ThermoCube2'] ==1:
if device_COM_port['ThermoCube2'] == None:
self.L.logger.warning(' ThermoCube_2 not connected (no address available).')
else:
self.TC_2 = ThermoCube.ThermoCube(address = device_COM_port['ThermoCube2'],
name = 'ThermoCube_2')
if self.TC_2.connected == True:
self.L.logger.info(' Temperature controller TC_2 Initialized.')
self.setTemp(20, 'Chamber2')
self.devices.append('ThermoCube2')
else:
self.L.logger.warning(' Could not make a connection with ThermoCube_2.')
else:
self.L.logger.info(' ThermoCube2 not connected according to user. Ignore if not needed.')
# initialize Temperature Controller 1 (TC_1)
if self.Machines['Oasis1'] == 1:
if device_COM_port['Oasis1'] == None:
self.L.logger.warning(' Oasis_1 not connected (no address available).')
else:
self.TC_1 = Oasis.Oasis(address = device_COM_port['Oasis1'],
name = 'Oasis_1')
if self.TC_1.connected == True:
self.L.logger.info(' Temperature controller TC_1 Initialized.')
self.setTemp(20, 'Chamber1')
self.devices.append('Oasis1')
else:
self.L.logger.warning(' Could not make a connection with Oasis_1.')
else:
self.L.logger.info(' Oasis1 not connected according to user. Ignore if not needed.')
# initialize Temperature Controller 2 (TC_2) Replace this with your temperature controller if needed.
if self.Machines['Oasis2'] ==1:
if device_COM_port['Oasis2'] == None:
self.L.logger.warning(' Oasis_2 not connected (no address available).')
else:
self.TC_2 = Oasis.Oasis(address = device_COM_port['Oasis2'],
name = 'Oasis_2')
if self.TC_2.connected == True:
self.L.logger.info(' Temperature controller TC_2 Initialized.')
self.setTemp(20, 'Chamber2')
self.devices.append('Oasis2')
else:
self.L.logger.warning(' Could not make a connection with Oasis_2.')
else:
self.L.logger.info(' Oasis2 not connected according to user. Ignore if not needed.')
# Initiate TC720
if self.Machines['TC720'] == 1:
if device_COM_port['TC720'] == None:
self.L.logger.warning(' TC720 not connected (no address available).')
else:
print(device_COM_port['TC720'])
self.TC720 = Py_TC720.TC720(address = device_COM_port['TC720'],
name = 'TC720',
mode = 0,
control_type = 0,
default_temp = 20,
verbose = False)
self.L.logger.info(' TC720 Initialized.')
self.devices.append('TC720')
else:
self.L.logger.info(' TC720 not connected according to user. Ignore if not needed.')
# initialize Yocto_Thermistor
if self.Machines['YoctoThermistor'] == 1:
try:
self.temp = YoctoThermistor_FISH.FISH_temperature_deamon(serial_number = self.Machine_identification['YoctoThermistor'], log_interval=10)
self.L.logger.info(' YoctoThermistor Initialized.')
self.temp.deamon_start()
self.devices.append('YoctoThermistor')
except SystemExit as e:
self.L.logger.info(' YoctoThermistor not connected, temperature functions will not work!')
self.L.logger.info(' Error code: {}'.format(e))
else:
self.L.logger.info(' YcotoThermistor not connected according to user. Ignore if not needed.')
# Initialize syringe pump CavroXE1000
if self.Machines['CavroXE1000'] == 1:
self.L.logger.info(' Connecting with CavroXE1000.')
pump_address = transport.TecanAPISerial.findSerialPumps()
self.pump = models.XE1000(com_link =transport.TecanAPISerial(0, pump_address[0][0] , 9600 ))
self.L.logger.info(' Established connection with CavroXE1000.')
self.L.logger.info(' Initiating CavroXE1000.')
self.connectPort('Waste') #Change to waste port
self.pump.init(in_port='right', init_speed = 20)
self.pump.setSpeed(speed=400, execute = True)
self.pump.setBacklashSteps(5, execute = True)
self.L.logger.info(' CavroXE1000 Initialized.')
self.devices.append('CavroXE1000')
else:
self.L.logger.info(' CavroXE1000 not connected according to user. Ignore if not needed.')
# Initiate Syringe pump CavroXCalibur
if self.Machines['CavroXCalibur'] == 1:
self.L.logger.info(' Connecting with CavroXCalibur')
pump_address = transport.TecanAPISerial.findSerialPumps()
self.connectPort('Waste') #Change to waste port
self.pump = models.XCalibur(com_link= transport.TecanAPISerial(0, pump_address[0][0] , 9600 ),
syringe_ul=2500,
direction='Z',
microstep=False,
slope=14,
speed=22,
debug=False,
debug_log_path='.')
self.L.logger.info(' Established connection with CavroXCalibur.')
self.L.logger.info(' Initiating CavroXCalibur.')
self.pump.init(speed=16,
direction='Z') #Z = Input left, output right. Y = Input right, output left
self.pump.setSpeed(22)
self.L.logger.info(' CavroXCalibur initiated.')
self.devices.append('CavroXCalibur')
#Update active machines
self.L.logger.info(' Connected devices: {}\n'.format(self.devices))
# Set them to 0 or 1 in the local dictionary and the database
for m in ['MXValve1', 'MXValve2', 'YoctoThermistor', 'CavroXE1000', 'CavroXCalibur', 'ThermoCube1', 'ThermoCube2', 'Oasis1', 'Oasis2', 'TC720']:
if m in self.devices:
print('Active machine: {}'.format(m))
self.Machines[m] = 1
perif.updateValueDB(self.db_path, 'Machines', column = m, new_value = 1)
else:
self.Machines[m] = 0
perif.updateValueDB(self.db_path, 'Machines', column = m, new_value = 0)
# Write them to the yaml info file.
perif.DBToYaml(self.db_path)
# Log all the parameters
self.L.logger.info( 'Start parameters:\n'+''.join(['{}: {}\n'.format(i, self.Parameters[i]) for i in self.Parameters]))
self.L.logger.info( 'Ports:\n'+''.join(['{}: {}\n'.format(i, self.Ports[i]) for i in self.Ports]))
self.L.logger.info( 'Active machines:\n'+''.join(['{}: {}\n'.format(i, self.Machines[i]) for i in self.Machines]))
self.L.logger.info( 'Machine_identification:\n'+''.join(['{}: {}\n'.format(i, self.Machine_identification[i]) for i in self.Machine_identification]))
self.L.logger.info( 'Fixed_USB_port:\n'+''.join(['{}: {}\n'.format(i, self.Fixed_USB_port[i]) for i in self.Fixed_USB_port]))
self.L.logger.info( 'Padding:\n'+''.join(['{}: {}\n'.format(i, self.Padding[i]) for i in self.Padding]))
self.L.logger.info('____')
self.L.logger.info('System ready for operation.')
self.L.logger.info('____')
#=============================================================================
# Experimental parameters management
#=============================================================================
def updateExperimentalParameters(self, db_path, ignore_flags=False, verbose=False):
"""
Checks if any of the flags are up and if so copies the values into memory.
Flags indicate new values.
Makes global dictionaries: Parameters, Buffers, Targets, Ports & Hybmix
Input:
`db_path`(str): Full path to database.
`ignore_flags`(bool): Option to ignore the flags and update everything.
If False, it only updates the Tables with new information.
Pause flag is not ignored.
"""
#Check if experiment is paused
count = 0
while True:
flags = perif.returnDictDB(db_path, 'Flags')[0]
if flags['Pause_flag'] == 0:
if verbose == True:
print('No Pause flag, retrieving data from database.\n')
break
else:
count += 1
if count % 120 == 0:
print('Database blocked. Waiting for user to finish updating datafile. {} minutes'.format((count*(1/6))))
time.sleep(10)
#Ignore the flags that indicate change and update everything.
if ignore_flags == True:
self.Parameters = perif.returnDictDB(db_path, 'Parameters')[0]
self.Volumes = perif.returnDictDB(db_path, 'Volumes')[0]
self.Targets = perif.returnDictDB(db_path, 'Targets')
self.Ports = perif.returnDictDB(db_path, 'Ports')[0]
self.Ports_reverse = {v:k for k,v in self.Ports.items()}
self.Hybmix = perif.returnDictDB(db_path, 'Hybmix')[0]
self.Machines = perif.returnDictDB(db_path, 'Machines')[0]
self.Machine_identification = perif.returnDictDB(db_path, 'Machine_identification')[0]
self.Fixed_USB_port = perif.returnDictDB(db_path, 'Fixed_USB_port')[0]
self.Operator_address = perif.returnDictDB(db_path, 'Operator_address')[0]
self.Padding = perif.returnDictDB(db_path, 'Padding')[0]
self.Alert_volume = perif.returnDictDB(db_path, 'Alert_volume')[0]
#Update all parameters from the database, if there has been an update.
else:
flags = perif.returnDictDB(db_path, 'Flags')[0]
if flags['Parameters_flag'] == 1:
self.Parameters = perif.returnDictDB(db_path, 'Parameters')[0]
perif.removeFlagDB(db_path, 'Parameters_flag')
if flags['Volumes_flag'] == 1:
self.Volumes = perif.returnDictDB(db_path, 'Volumes')[0]
perif.removeFlagDB(db_path, 'Volumes_flag')
if flags['Targets_flag'] == 1:
self.Targets = perif.returnDictDB(db_path, 'Targets')
perif.removeFlagDB(db_path, 'Targets_flag')
if flags['Ports_flag'] == 1:
self.Ports = perif.returnDictDB(db_path, 'Ports')[0]
self.Ports_reverse = {v:k for k,v in self.Ports.items()}
perif.removeFlagDB(db_path, 'Ports_flag')
if flags['Hybmix_flag'] == 1:
self.Hybmix = perif.returnDictDB(db_path, 'Hybmix')[0]
perif.removeFlagDB(db_path, 'Hybmix_flag')
if flags['Machines_flag'] == 1:
self.Machines = perif.returnDictDB(db_path, 'Machines')[0]
perif.removeFlagDB(db_path, 'Machines_flag')
if flags['Machine_identification_flag'] == 1:
self.Machine_identification = perif.returnDictDB(db_path, 'Machine_identification')[0]
perif.removeFlagDB(db_path, 'Machine_identification_flag')
if flags['Fixed_USB_port_flag'] == 1:
self.Fixed_USB_port = perif.returnDictDB(db_path, 'Fixed_USB_port')[0]
perif.removeFlagDB(db_path, 'Fixed_USB_port_flag')
if flags['Operator_address_flag'] == 1:
self.Operator_address = perif.returnDictDB(db_path, 'Operator_address')[0]
perif.removeFlagDB(db_path, 'Operator_address_flag')
if flags['Padding_flag'] == 1:
self.Padding = perif.returnDictDB(db_path, 'Padding')[0]
perif.removeFlagDB(db_path, 'Padding_flag')
if flags['Alert_volume_flag'] == 1:
self.Alert_volume = perif.returnDictDB(db_path, 'Alert_volume')[0]
perif.removeFlagDB(db_path, 'Alert_volume_flag')
def getHybmixCode(self, target, cycle, indirect):
"""Get the code for the Hybridization mix.
Input:
`target`(str): Target chamber. Like: 'Chamber1'
`cycle`(int): Current cycle of experiment.
`indirect`(str): Optional when indirect labeling is used. Set indirect
to "A" if you want to dispense the encoding probes. Pass "B" if you
want to dispense the detection probes.
Returns:
`Hybmix_code`: Code that the user has put in the info file.
"""
#Get code for Chamber
if target.lower() == 'chamber1':
chamber = 'C1_'
elif target.lower() == 'chamber2':
chamber = 'C2_'
else:
raise Exception ('Unknown Target: "{}". Choose "Chamber1" or "Chamber2"'.format(target))
#Get cycle number as string
cycle = str(cycle).zfill(2)
#Make Hybmix code
Hybmix_code = chamber + cycle
#Append indirect letter if indirect protocol is run.
if indirect != None:
if indirect.lower() == 'a':
indirect = '_A'
elif indirect.lower() == 'b':
indirect = '_B'
elif indirect.lower() == 'c':
indirect = '_C'
else:
raise Exception ('Unknown Indirect labeling indicator: {}, Choose "A" for encoding probes, "B" for amplifiers or "C" for detection probes'.format(indirect))
Hybmix_code = Hybmix_code + indirect
else:
indirect = '_A'
return Hybmix_code
def getHybmixPort(self, Hybmix_code):
"""Returns the port beloning to a certain hybridization mix.
Input:
`Hybmix_code` (str): Identifier of the hybridization mix.
Can be obtained from self.getHybmixCode().
Returns:
`Hybmix_port` (str): Port number of the required port.
"""
#Test if Hybridization mix is placed in the system by the user.
while True:
try:
Hybmix_port_reverse = {v:k for k,v in self.Hybmix.items()}
Hybmix_port = Hybmix_port_reverse[Hybmix_code]
break
except KeyError as e:
print('Right Hybridization mix is not connected. Add {} to system. KeyError: {}'.format(Hybmix_code, e))
print('Note: if the mensioned Hybmix is already dispensed, you probably told the program to wash both after dispensing the Hybmix and in the sceduler. Set one of them to False.')
self.push(short_message='Hybmix not connected',
long_message= 'Please place Hybmix {} in system and add it to the "Hybmix" table in the datafile'.format(Hybmix_code))
input('Press enter if {} is placed and added to the "Hybmix" table in the datafile...'.format(Hybmix_code))
self.updateExperimentalParameters(self.db_path, ignore_flags=True)
return Hybmix_port
#=============================================================================
# Hardware management
#=============================================================================
def deviceAddress(self):
"""
Functions that finds the Windows 'COM' ports of all used devices
automatically. Devices are identified by their FTDI chip identifier.
To find the FTDI chip identifier use the find_address() function
and add the identifier to the user FISH_System_datafil.yaml
Returns:
`device_COM_port` (dict): Dictionary with name of device and COM port.
Devices need to be added to the database using the FISH2_user_program and
the FISH_system_datafile.yaml
"""
device_COM_port = {
'CavroXE1000': None,
'CavroXCalibur': None,
'MXValve1': None,
'MXValve2': None,
'ThermoCube1': None,
'ThermoCube2': None,
'Oasis1': None,
'Oasis2': None,
'YoctoThermistor': None,
'TC720': None}
#Find all serial ports
ports_info = serial.tools.list_ports.comports()
#Invert the Machine_identification to get the name of the machines using the identifier
identifier = {v: k for k, v in self.Machine_identification.items()}
#Try if machine can be found using the given identifiers.
for port in ports_info:
try:
corresponding_machine = identifier[port.serial_number]
device_COM_port[corresponding_machine] = port.device
except Exception as e:
pass
#For machines without an identification method, add the static address.
for machine in device_COM_port.keys():
if device_COM_port[machine] == None:
if self.Fixed_USB_port[machine] != 'None':
print(repr(self.Fixed_USB_port[machine]))
device_COM_port[machine] = self.Fixed_USB_port[machine]
print(device_COM_port[machine])
print('Communication port for: {} added using a static address. Use with care. Do not move around USB cables without remapping the addresses.'.format(machine))
else:
if self.Machines[machine] == 1:
print(f'Warning! No USB port information or serial identification code present for: {machine} add either to the Fixed_USB_port or Machine_identification table using the user program. Ignore if machine has its own identification method.')
return device_COM_port
def getSerialPump_XCalibur(self):
'''
Adapted from the "XCaliburD" function in "gui_setup.py"
In the "tecancavro" package, written by: Ben Pruitt & Nick Conway
https://github.com/benpruitt/tecancavro
Assumes that the pump is a XCalibur pump and returns a tuple
(<serial port>, <instantiated XCalibur>)
'''
pump_list = transport.TecanAPISerial.findSerialPumps()
return [(ser_port, XCalibur(com_link=transport.TecanAPISerial(0,
pump_list[0][0] , 9600 ))) for ser_port, _, _ in pump_list]
def getSerialPump_XE1000(self):
'''
Adapted from the "XCalibur" function in "gui_setup.py"
In the "tecancavro" package, written by: Ben Pruitt & Nick Conway
https://github.com/benpruitt/tecancavro
Assumes that the pump is a XE1000 pumps and returns a tuple
(<serial port>, <instantiated XCalibur>)
'''
pump_list = transport.TecanAPISerial.findSerialPumps()
return [(ser_port, XE1000(com_link=transport.TecanAPISerial(0,
ser_port, 9600))) for ser_port, _, _ in pump_list]
#=============================================================================
# System calibration
#=============================================================================
def findPaddingVolumeChamber(self, volume, air_port , target):
"""
find the padding volume between valve and chamber by testing a volume.
The function will aspirate an air bubble from the given port. (make sure no
fluid tubes are connected to that port).
Then it will create a padding volume that is specified and push it to the
target port. By monitoring the location of the air bubble you can
determine the padding volume. If the air bubble is just out of the tube/on
the correct position, the padding volume is correct and should be saved in
the "FISH_System_datafile.yaml".
Input:
`volume`(int): Volume to test as padding volume
`air_port`(str): Name of port that is disconnected. Like: 'P3'
`target`(str): Target to push to. Like: 'Chamber1' or 'P2'
"""
input('This function will push a tiny air bubble to the hybridization chamber, if it just reaches it you have found the padding volume. Press Enter to continue...')
self.connectPort(air_port)
self.pump.extract(volume_ul = 20 , from_port = 'output', execute=True)
self.connectPort(target)
self.pump.extract(volume_ul = volume, from_port = 'input', execute=True)
self.pump.dispense(volume_ul = (volume+20), to_port = 'output', execute=True)
print('If the padding volume of {}ul is correct, add it to the "FISH_System_datafile.yaml" for the target: {}. Press Enter to continue...'.format(volume, target))
def findPaddingVolumeHYBmix(self, volume, hybmix):
"""
Find the padding volume between valve and HYBmix by testing a volume.
Start with low volumes, this function will aspirate the hybmix into
the reservoir, if there still is air between hybmix and running liquid
the given volume is not sufficient.
Input:
`volume`(int): Volume to test as padding volume
`hybmix`(str): Hybmix to aspirate. Like: 'HYB01'
"""
input('Place some liquid (use same viscosity as real experiment) in the system. This function will aspirate the volume and you need to see if the liquid reaches the reservoir tube. The best is if it just reaches the reservoir. Press Enter to continue...')
self.connectPort(hybmix)
self.pump.extract(volume_ul = volume , from_port ='output', execute=True)
self.resetReservoir(0)
self.connectPort(hybmix)
self.pump.extract(volume_ul = 150, from_port = 'output', execute=True)
print('Look at reservoir, if you see an air bubble you need to aspirate more.')
input('Press enter to continue, this will empty the tubes...')
self.resetReservoir(0)
self.connectPort(hybmix)
self.pump.extract(volume_ul = 1000 , from_port = 'output', execute=True)
self.resetReservoir(200)
print('If the padding volume of {}ul is correct, add it to the "FISH_System_datafile.yaml" for the port: {}. Press Enter to continue...'.format(volume, hybmix))
def findPaddingVolumeBuffer(self, volume, port, air_port):
"""
find the padding volume between valve and a buffer by testing a volume.
If you see the buffer after the air bubble in the reservoir, the volume is
sufficient and should be saved in "FISH_System_datafile.yaml".
Input:
`volume`(int): Volume to test as padding volume
`buffer_intrest`(str): Name of port of interest
`air_port`(str): Port connected to air
"""
print('Make sure the tube of the buffer of interest is filled with air.')
input('Press Enter to start and aspirate {}ul of the buffer. Please look at the buffer and note where it ends up. Press Enter to continue...'.format(volume))
self.extractBuffer(port, volume)
print('Look at reservoir, if you see that the buffer passed the valve and is just in the reservoir tube, the volume is good.')
input('Press enter to continue, this will fill the tube with air so you can perform the test again...')
self.resetReservoir(0)
self.extractBuffer(air_port, (volume + 200))
self.connectPort(port)
self.pump.dispense(volume_ul=(volume+200), to_port='output', execute=True)
print('If the padding volume of {}ul is correct, add it to the "FISH_System_datafile.yaml" for the port: {}. Press Enter to continue...'.format(volume, port))
#=============================================================================
# Communication with user by push messages
#=============================================================================
def push(self, short_message='', long_message=''):
"""
Wrapper around the send_push() function from peripherals.
Input:
`short_message`(str): Subject of message.
`long_message`(str): Body of message.
"""
sm = '{}: {}'.format(self.system_name, short_message)
perif.send_push(self.Operator_address, operator = self.Parameters['Operator'],
short_message=sm, long_message=long_message)
#=============================================================================
# Fluid tracking (update buffers, notify user)
#=============================================================================
def checkBuffer(self):
"""
Function that checks if any of the buffers is running low.
It will notify the Operator if any of the buffers need a refill
The minimal volumes can be defined with the user program in the
Alert_volume table. Waste is checked if it is too full.
"""
#Check current volumes
almost_empty = []
for p, v in self.Volumes.items():
if isinstance(v, int) or isinstance(v, float): #Do not check unconnected buffers
#Check if buffer has a name and alert volume
if self.Ports[p] == 'None' or self.Alert_volume[p] == 'None':
raise Exception('Port {} is not correctly configured, a volume is listed but no name or no Alert volume is specified. Name: {}, Alert_volume: {}. Please update datafile.'.format(p,self.Ports[p], self.Alert_volume[p]))
#Check waste
elif p == self.Ports_reverse['Waste']:
if v > self.Alert_volume[p]:
almost_empty.append([p, v])
#Check buffers
elif v < self.Alert_volume[p]:
almost_empty.append([p, v])
else:
pass
#Send message
if almost_empty != []:
long_message = 'Replace:\n'
for i in almost_empty:
#Get the name of the buffer connected to the port
if i[0] != 'RunningBuffer':
buffer_name = self.Ports[i[0]]
else:
buffer_name = i[0]
#Compose message
long_message += '{}: {}, current vol: {}ml\n'.format(i[0], buffer_name, round(i[1]/1000))
long_message += time.strftime("%d-%m-%Y %H:%M:%S")
self.push(short_message='Buffer status', long_message= long_message)
print(long_message)
def updateBuffer(self, target, volume, check=True):
"""
Subtract used volume of a certain buffer.
Input:
`target`(str): Buffer name or port number
`volume`(int): volume to subtract. in ul
`check`(bool): Check if buffers need to be replaced, Default = True
"""
#Find the buffer code of the buffer
port = self.getPort(target, port_number=False)
#Update waste by summing
if port == self.Ports_reverse['Waste']:
self.Volumes[port] = self.Volumes[port] + volume
perif.updateValueDB(self.db_path, 'Volumes', column = port, operation = '+', value = volume)
#Update buffer by substacting
else:
self.Volumes[port] = self.Volumes[port] - volume
perif.updateValueDB(self.db_path, 'Volumes', column = port, operation = '-', value = volume)
#Check if buffers need replacement
if check == True:
self.checkBuffer()
#=============================================================================
# Fluid handling, Lower level functions
#=============================================================================
def getPort(self, target, port_number=False):
"""
Return the port code of a target buffer
Input:
`target`(str): Either the port number (P1-P20) or
the buffer name as in the 'Ports' dictionary..
`port_number`(bool): If False, returns the port code P1-P20.
If True, returns the port number 1-20.
"""
#Check input
if target not in self.Ports.keys() and target not in self.Ports.values():
raise(ValueError('Invalid target name: {}, Not present in Ports'.format(target)))
if target in self.Ports.keys():
port = target
else:
port = self.Ports_reverse[target]
#Return the port code.
if port_number == False:
return port
#Return the port number
if port_number == True:
return int(port[1:])
def connectPort(self, target):
"""
Function to connects the reservoir to the target buffer.
Input:
`target`(str): Either the port number (P1-P20) or
the buffer name as in the 'Ports' dictionary.
"""
#Get the number of the port
port = self.getPort(target, port_number=True)
#Change port
if port > 10:
#Check if valve 2 is connected
if 'Valve2' not in self.Ports.values():
raise(ValueError('Connection to "Valve2" is not defined. Add "Valve2" to the Ports table.'))
#Connect valve 1 with valve 2
port_valve2 = self.getPort('Valve2', port_number=True)
self.MXValve1.change_port(port_valve2)
#Connect to the right port on valve 2
self.MXValve2.change_port(port - 10)
else:
#Connect to the right port on valve 1
self.MXValve1.change_port(port)
def extractBuffer(self, buffer, volume, bubble = False, bubble_volume = 30):
"""
Load a certain volume of a certain buffer into the reservoir with or
without an air bubble.
Input:
`buffer`(str): Target buffer/port as in the 'Ports' dictionary.
`volume`(int): Volume to aspirate.
`bubble`(bool): If a bubble of 30ul should be aspirated. Default = False
`bubble_volume`(int): Volume of bubble. Default 30ul.
"""
if bubble == True:
self.airBubble(bubble_volume = bubble_volume)
self.connectPort(buffer) #Change to port of buffer
self.pump.extract(volume_ul = volume, from_port = 'output', execute=True)
def dispenseBuffer(self, target, volume):
"""
Dispenses a volume to the target port.
Presumes that some buffer is extracted with a volume not larger than the
dispense volume first.
Input:
`target`(str): target buffer/port as in the 'ports' dictionary.
`volume`(int): volume in ul to dispense.
"""
self.connectPort(target)
self.pump.dispense(volume_ul = volume, to_port= 'output', execute=True)
def padding(self, target):
"""
Aspirates a padding volume to bridge the tubing from reservoir to target,
so that the buffer reaches the target..
Input:
`target`(int): Target to dispense to, eiter Buffer name as in Ports,
or the port number.
Returns the padding volume.
"""
target = self.getPort(target)
volume = self.Padding[self.getPort(target)]
self.pump.extract(volume_ul = volume, from_port = 'input', execute=True)
return volume
def airBubble(self, bubble_volume = 30): #Not advised to use air bubble
"""
Aspirate an air bubble into the reservoir. To separate running liquid
from the buffer to dispense.
Input:
`bubble_vollume`(int): size of the bubble in ul. Default = 30.
volume should be between 10 and 50 ul.
10ul might be unstable. 30 works consistent
"""
#in the 1/8"OD, 0.62" tube of the reservoir, 1mm holds 1.94778 ul
if not 10 <= bubble_volume <= 50:
raise(ValueError('`bubble_volume` must be between 0 and {}'.format(bubble_volume)))
self.connectPort('Air') #Change to air port
#When working with air, a backlash is not desired as it might pump some buffer
#into the air filter, because the backlash is bigger than the air volume.
original_backlash = self.pump.getBacklashSteps()
self.pump.setBacklashSteps(0, execute=True)
self.pump.extract(volume_ul = bubble_volume, from_port = 'output', execute=True)
self.pump.setBacklashSteps(original_backlash, execute=True)
def resetReservoir(self, replace_volume = 200, update_buffer = False):
"""
Replaces the "replace_volume" ul of RunningBuffer in the reservoir that is closest
to the multivalve.
Input:
`replace_volume`(int): Volume to replace in reservoir. Default 200ul.
`update_buffer`(bool): If true it will update the RunningBuffer and Waste volumes.
"""
max_volume = self.pump.syringe_ul
if not 0 <= replace_volume <= (max_volume + 1):
raise(ValueError('`replace_volume` must be between 0 and {}'.format(max_volume)))
self.connectPort('Waste') #Change to waste port
self.pump.changePort('output', execute=True) #change to output
self.pump.movePlungerAbs(0, execute=True) #return pump to 0
if replace_volume > 0:
self.pump.extract(volume_ul = replace_volume, from_port = 'input', execute=True)
self.pump.dispense(volume_ul= replace_volume, to_port = 'output', execute=True)
if update_buffer == True:
self.updateBuffer('RunningBuffer', replace_volume, check=False)
self.updateBuffer('Waste', replace_volume, check=False)
#=============================================================================
# Function decorator
#=============================================================================
def functionWrap(function):
"""
Wrapper for high level functions.
The wrapper will execute the following steps (indicated with *):
*Check if the experiment is paused by the user.
*Check if the database is updated and if so update the parameters.
*If buffers are replaced, prime their respective fluid lines.
*Start timer.
Execute function.
*Stop timer.
*Calculate time to wait.
*Sleep the wait time minus function execution time.
Input as keyword arguments:
`d`(int): number of days to wait.
`h`(int): number of hours to wait.
`m`(int): number of minutes to wait.
`s`(int): number of seconds to wait.
The function can take up till 10% of the incubation time it is given.
If it extends that it will be set to 10% of the incubation time. And
the remaining wait time will be 90% of the given incubation time.
"""
@wraps(function)
def wrapped(self, *args, **kwargs):
#Check if user paused the experiment
count = 0
while True:
pause = perif.returnDictDB(self.db_path, 'Flags')[0]['Pause_flag']
if pause == 1:
if count == 0:
print('Experiment paused. Continue the experiment in the user program.')
time.sleep(1)
count += 1
#Check for errors on the system and send a pause reminder after 10min, every 10min.
if count >= 600 and (count%600) == 0:
self.check_error(35, 5, 10)
self.L.logger.info('Experiment paused by user. Already {} minutes.'.format(round(count/6)))
self.push(short_message= 'Experiment paused',
long_message='Experiment is still paused by user. Already {} minutes.'.format(round(count/6)))
else:
break
#Update
self.updateExperimentalParameters(self.db_path, ignore_flags=False)
#Prime buffers if the prime flag has been set for the specific buffer
current_flags = perif.returnDictDB(self.db_path, 'Flags')[0]
for p in self.Ports:
if current_flags[p] == 1:
self.prime(p)
perif.removeFlagDB(self.db_path, p)
self.L.logger.info('Primed port {} connected to {} buffer after replacement.'.format(p, self.Ports[p]))
#Input handling
if 'd' in kwargs:
sleep_d = kwargs['d'] * 86400
del kwargs['d'] #If it is not deleted the function gets the kwargs, which will be invalid arguments.
else:
sleep_d = 0