forked from UniversalRobots/Universal_Robots_Client_Library
-
Notifications
You must be signed in to change notification settings - Fork 0
/
external_control.urscript
781 lines (692 loc) · 30 KB
/
external_control.urscript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
# HEADER_BEGIN
{{BEGIN_REPLACE}}
steptime = get_steptime()
textmsg("ExternalControl: steptime=", steptime)
MULT_jointstate = {{JOINT_STATE_REPLACE}}
MULT_time = {{TIME_REPLACE}}
STOPJ_ACCELERATION = 4.0
#Constants
SERVO_UNINITIALIZED = -1
SERVO_IDLE = 0
SERVO_RUNNING = 1
MODE_STOPPED = -2
MODE_UNINITIALIZED = -1
MODE_IDLE = 0
MODE_SERVOJ = 1
MODE_SPEEDJ = 2
MODE_FORWARD = 3
MODE_SPEEDL = 4
MODE_POSE = 5
MODE_FREEDRIVE = 6
MODE_TOOL_IN_CONTACT = 7
# Data dimensions of the message received on the reverse interface
REVERSE_INTERFACE_DATA_DIMENSION = 8
TRAJECTORY_MODE_RECEIVE = 1
TRAJECTORY_MODE_CANCEL = -1
TRAJECTORY_POINT_JOINT = 0
TRAJECTORY_POINT_CARTESIAN = 1
TRAJECTORY_POINT_JOINT_SPLINE = 2
TRAJECTORY_DATA_DIMENSION = 3*6 + 1
TRAJECTORY_RESULT_SUCCESS = 0
TRAJECTORY_RESULT_CANCELED = 1
TRAJECTORY_RESULT_FAILURE = 2
ZERO_FTSENSOR = 0
SET_PAYLOAD = 1
SET_TOOL_VOLTAGE = 2
START_FORCE_MODE = 3
END_FORCE_MODE = 4
START_TOOL_CONTACT = 5
END_TOOL_CONTACT = 6
SCRIPT_COMMAND_DATA_DIMENSION = 26
FREEDRIVE_MODE_START = 1
FREEDRIVE_MODE_STOP = -1
UNTIL_TOOL_CONTACT_RESULT_SUCCESS = 0
UNTIL_TOOL_CONTACT_RESULT_CANCELED = 1
SPLINE_CUBIC = 1
SPLINE_QUINTIC = 2
# Maximum allowable joint speed in rad/s
MAX_JOINT_SPEED = 6.283185
# Any motion commands resulting in a velocity higher than that will be ignored.
JOINT_IGNORE_SPEED = 20.0
#Global variables are also showed in the Teach pendants variable list
global violation_popup_counter = 0
global cmd_servo_state = SERVO_UNINITIALIZED
global cmd_servo_qd = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
global cmd_servo_q = get_actual_joint_positions()
global cmd_servo_q_last = get_actual_joint_positions()
global cmd_twist = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
global extrapolate_count = 0
global extrapolate_max_count = 0
global control_mode = MODE_UNINITIALIZED
global trajectory_points_left = 0
global spline_qdd = [0, 0, 0, 0, 0, 0]
global spline_qd = [0, 0, 0, 0, 0, 0]
global tool_contact_running = False
global trajectory_result = 0
# Global thread variables
thread_move = 0
thread_trajectory = 0
thread_script_commands = 0
###
# @brief Function to verify whether the specified target can be reached within the defined time frame while staying within
# the robot's speed limits
#
# @param target array is the joint target to reach
# @param time float is the time to reach the target
#
# @returns bool true if the target is reachable within the robot's speed limits, false otherwise
###
def targetWithinLimits(step_start, step_end, time):
local idx = 0
while idx < 6:
local velocity = norm(step_end[idx] - step_start[idx]) / time
if velocity > JOINT_IGNORE_SPEED:
local str = str_cat(str_cat("Velocity ", velocity), str_cat(str_cat(" required to reach the received target ", step_end), str_cat(str_cat(" within ", time), " seconds is exceeding the joint velocity limits. Ignoring commands until a valid command is received.")))
if violation_popup_counter == 0:
# We want a popup when an invalid commant is sent. As long as we keep sending invalid
# commands, we do not want to repeat the popup.
popup(str, title="External Control error", blocking=False, error=True)
end
if violation_popup_counter * get_steptime() % 5.0 == 0:
# We want to have a log printed regularly. We are receiving motion commands that are not
# feasible. The user should have a chance to know about this.
textmsg(str)
end
violation_popup_counter = violation_popup_counter + 1
return False
end
idx = idx + 1
end
if violation_popup_counter > 0:
textmsg("Received valid command. Resuming execution.")
violation_popup_counter = 0
end
return True
end
def trajectory_result_to_str(trajectory_result):
if trajectory_result == TRAJECTORY_RESULT_SUCCESS:
return "SUCCESS"
end
if trajectory_result == TRAJECTORY_RESULT_CANCELED:
return "CANCELED"
end
if trajectory_result == TRAJECTORY_RESULT_FAILURE:
return "FAILURE"
end
return "UNKNOWN"
end
def terminateProgram():
textmsg("Terminating robot program due to targets sent to the robot is violating robot constraints.")
stopj(STOPJ_ACCELERATION)
halt
end
def set_servo_setpoint(q):
cmd_servo_state = SERVO_RUNNING
cmd_servo_q_last = cmd_servo_q
cmd_servo_q = q
end
def extrapolate():
diff = [cmd_servo_q[0] - cmd_servo_q_last[0], cmd_servo_q[1] - cmd_servo_q_last[1], cmd_servo_q[2] - cmd_servo_q_last[2], cmd_servo_q[3] - cmd_servo_q_last[3], cmd_servo_q[4] - cmd_servo_q_last[4], cmd_servo_q[5] - cmd_servo_q_last[5]]
cmd_servo_q_last = cmd_servo_q
cmd_servo_q = [cmd_servo_q[0] + diff[0], cmd_servo_q[1] + diff[1], cmd_servo_q[2] + diff[2], cmd_servo_q[3] + diff[3], cmd_servo_q[4] + diff[4], cmd_servo_q[5] + diff[5]]
return cmd_servo_q
end
thread servoThread():
textmsg("ExternalControl: Starting servo thread")
state = SERVO_IDLE
local q_last = get_target_joint_positions()
while control_mode == MODE_SERVOJ:
enter_critical
q = cmd_servo_q
do_extrapolate = False
if (cmd_servo_state == SERVO_IDLE):
do_extrapolate = True
end
state = cmd_servo_state
if cmd_servo_state > SERVO_UNINITIALIZED:
cmd_servo_state = SERVO_IDLE
end
if do_extrapolate:
extrapolate_count = extrapolate_count + 1
if extrapolate_count > extrapolate_max_count:
extrapolate_max_count = extrapolate_count
end
q = extrapolate()
if targetWithinLimits(q_last, q, steptime):
servoj(q, t=steptime, {{SERVO_J_REPLACE}})
q_last = q
end
elif state == SERVO_RUNNING:
extrapolate_count = 0
if targetWithinLimits(q_last, q, steptime):
servoj(q, t=steptime, {{SERVO_J_REPLACE}})
q_last = q
end
else:
extrapolate_count = 0
sync()
end
exit_critical
end
textmsg("ExternalControl: servo thread ended")
stopj(STOPJ_ACCELERATION)
end
# Helpers for speed control
def set_speed(qd):
cmd_servo_qd = qd
control_mode = MODE_SPEEDJ
end
thread speedThread():
textmsg("ExternalControl: Starting speed thread")
while control_mode == MODE_SPEEDJ:
qd = cmd_servo_qd
speedj(qd, 40.0, steptime)
end
textmsg("ExternalControl: speedj thread ended")
stopj(STOPJ_ACCELERATION)
end
# Function return value (bool) determines whether the robot is moving after this spline segment or
# not.
def cubicSplineRun(end_q, end_qd, time, is_last_point=False, is_first_point=False):
local str = str_cat(end_q, str_cat(end_qd, time))
textmsg("Cubic spline arg: ", str)
local start_q = get_target_joint_positions()
# Zero time means infinite velocity to reach the target and is therefore impossible
if time <= 0.0:
if is_first_point and time == 0.0:
# If users specify the current joint position with time 0 that may be fine, in that case just
# ignore that point
local distance = norm(end_q - get_target_joint_positions())
if distance < 0.01:
local splineTimerTraveled = 0.0
# USED_IN_TEST_SPLINE_INTERPOLATION write_output_float_register(1, splineTimerTraveled)
textmsg("Ignoring first point with time 0")
return False
end
end
error_str = "Spline time shouldn't be zero as it would require infinite velocity to reach the target. Canceling motion."
textmsg(error_str)
trajectory_result = TRAJECTORY_RESULT_CANCELED
popup(error_str, title="External Control error", blocking=False, error=True)
return False
elif targetWithinLimits(start_q, end_q, time):
local start_qd = spline_qd
# Coefficients0 is not included, since we do not need to calculate the position
local coefficients1 = start_qd
local coefficients2 = (-3 * start_q + end_q * 3 - start_qd * 2 * time - end_qd * time) / pow(time, 2)
local coefficients3 = (2 * start_q - 2 * end_q + start_qd * time + end_qd * time) / pow(time, 3)
local coefficients4 = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
local coefficients5 = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
jointSplineRun(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, time, is_last_point)
return True and (is_last_point == False)
else:
trajectory_result = TRAJECTORY_RESULT_CANCELED
return False
end
end
# Function return value (bool) determines whether the robot is moving after this spline segment or
# not.
def quinticSplineRun(end_q, end_qd, end_qdd, time, is_last_point=False, is_first_point=False):
local str = str_cat(end_q, str_cat(end_qd, str_cat(end_qdd, time)))
textmsg("Quintic spline arg: ", str)
local start_q = get_target_joint_positions()
# Zero time means infinite velocity to reach the target and is therefore impossible
if time <= 0.0:
if is_first_point and time == 0.0:
# If users specify the current joint position with time 0 that may be fine, in that case just
# ignore that point
local distance = norm(end_q - get_target_joint_positions())
if distance < 0.01:
return False
end
end
error_str = "Spline time shouldn't be zero as it would require infinite velocity to reach the target. Canceling motion."
textmsg(error_str)
trajectory_result = TRAJECTORY_RESULT_CANCELED
popup(error_str, title="External Control error", blocking=False, error=True)
return False
elif targetWithinLimits(start_q, end_q, time):
local start_qd = spline_qd
local start_qdd = spline_qdd
# Pre-calculate coefficients
local TIME2 = pow(time, 2)
# Coefficients0 is not included, since we do not need to calculate the position
local coefficients1 = start_qd
local coefficients2 = 0.5 * start_qdd
local coefficients3 = (-20.0 * start_q + 20.0 * end_q - 3.0 * start_qdd * TIME2 + end_qdd * TIME2 - 12.0 * start_qd * time - 8.0 * end_qd * time) / (2.0 * pow(time, 3))
local coefficients4 = (30.0 * start_q - 30.0 * end_q + 3.0 * start_qdd * TIME2 - 2.0 * end_qdd * TIME2 + 16.0 * start_qd * time + 14.0 * end_qd * time) / (2.0 * pow(time, 4))
local coefficients5 = (-12.0 * start_q + 12.0 * end_q - start_qdd * TIME2 + end_qdd * TIME2 - 6.0 * start_qd * time - 6.0 * end_qd * time) / (2.0 * pow(time, 5))
jointSplineRun(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, time, is_last_point)
return True and (is_last_point == False)
else:
trajectory_result = TRAJECTORY_RESULT_CANCELED
return False
end
end
def jointSplineRun(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTotalTravelTime, is_last_point):
# Initialize variables
local splineTimerTraveled = 0.0
local scaled_step_time = get_steptime()
local scaling_factor = 1.0
local is_slowing_down = False
local slowing_down_time = 0.0
# Interpolate the spline in whole time steps
while (splineTotalTravelTime - splineTimerTraveled) > get_steptime():
local time_left = splineTotalTravelTime - splineTimerTraveled
# Maximum deceleration in rad/s^2
local max_deceleration = 15
# The time needed to decelerate with 15 rad/s^2 to reach zero velocity when we move at maximum velocity.
local deceleration_time = MAX_JOINT_SPEED / max_deceleration
# If the velocity is too large close to the end of the trajectory we scale the
# trajectory, such that we follow the positional part of the trajectory, but end with zero velocity.
if (time_left <= deceleration_time) and (is_last_point == True):
if is_slowing_down == False:
# Peek what the joint velocities will be if we take a full time step
local qd = jointSplinePeek(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTimerTraveled + get_steptime())
# Compute the time left to decelerate if we take a full time step
local x = deceleration_time - (time_left - get_steptime())
is_slowing_down = checkSlowDownRequired(x, qd, max_deceleration)
if is_slowing_down == True:
# This will ensure that we scale the trajectory right away
slowing_down_time = time_left + get_steptime()
textmsg("Velocity is too fast towards the end of the trajectory. The robot will be slowing down, while following the positional part of the trajectory.")
end
end
if is_slowing_down == True:
# Compute scaling factor based on time left and total slowing down time
scaling_factor = time_left / slowing_down_time
scaled_step_time = get_steptime() * scaling_factor
end
end
splineTimerTraveled = splineTimerTraveled + scaled_step_time
jointSplineStep(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTimerTraveled, get_steptime(), scaling_factor, is_slowing_down)
end
# Make sure that we approach zero velocity slowly, when slowing down
if is_slowing_down == True:
local time_left = splineTotalTravelTime - splineTimerTraveled
while time_left >= 1e-5:
time_left = splineTotalTravelTime - splineTimerTraveled
# Compute scaling factor based on time left and total slowing down time
scaling_factor = time_left / slowing_down_time
scaled_step_time = get_steptime() * scaling_factor
splineTimerTraveled = splineTimerTraveled + scaled_step_time
jointSplineStep(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTimerTraveled, get_steptime(), scaling_factor, is_slowing_down)
end
scaling_factor = 0.0
end
# Last part of the spline which uses less than one time step
local timeLeftToTravel = splineTotalTravelTime - splineTimerTraveled
# To round off the float to the steptime step when it is very close to that number (1e-15)
if timeLeftToTravel == get_steptime():
timeLeftToTravel = get_steptime()
end
jointSplineStep(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTotalTravelTime, timeLeftToTravel, scaling_factor, is_slowing_down)
end
def jointSplineStep(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTimerTraveled, timestep, scaling_factor, is_slowing_down=False):
local last_spline_qd = spline_qd
spline_qd = coefficients1 + 2.0 * splineTimerTraveled * coefficients2 + 3.0 * pow(splineTimerTraveled, 2) * coefficients3 + 4.0 * pow(splineTimerTraveled, 3) * coefficients4 + 5.0 * pow(splineTimerTraveled, 4) * coefficients5
spline_qdd = 2.0 * coefficients2 + 6.0 * splineTimerTraveled * coefficients3 + 12.0 * pow(splineTimerTraveled, 2) * coefficients4 + 20.0 * pow(splineTimerTraveled, 3) * coefficients5
spline_qd = spline_qd * scaling_factor
# Calculate the max needed qdd arg for speedj to distribute the velocity change over the whole timestep no matter the speed slider value
qdd_max = list_max_norm((spline_qd - last_spline_qd) / timestep)
# USED_IN_TEST_SPLINE_INTERPOLATION write_output_float_register(1, splineTimerTraveled)
speedj(spline_qd, qdd_max, timestep)
end
# Helper function to see what the velocity will be if we take a full step
def jointSplinePeek(coefficients1, coefficients2, coefficients3, coefficients4, coefficients5, splineTimerTraveled):
local qd = coefficients1 + 2.0 * splineTimerTraveled * coefficients2 + 3.0 * pow(splineTimerTraveled, 2) * coefficients3 + 4.0 * pow(splineTimerTraveled, 3) * coefficients4 + 5.0 * pow(splineTimerTraveled, 4) * coefficients5
return qd
end
###
# @brief Helper function to figure out if we need to slown down velocity at the last part of the last spline segment.
#
# The max_speed and max_deceleration is used to compute a linear velocity profile as function of the time left to decelerate,
# then we can use the time left to decelerate to find the maximum allowed speed given the time we have left. If we are above
# the maximum allowed speed, we start slowing down.
#
# @param x float is the time left to decelerate
# @param qd array is the velocity if we take a full step ahead in the spline
# @param max_deceleration float is the maximum allowed deceleration 15 rad/s^2
#
# @returns bool True if slow don is required, false otherwise
###
def checkSlowDownRequired(x, qd, max_deceleration):
local max_allowable_speed = MAX_JOINT_SPEED - x * max_deceleration
local idx = 0
while idx < 6:
if norm(qd[idx]) > max_allowable_speed:
return True
end
idx = idx + 1
end
return False
end
###
# @brief Find the maximum value in a list the list must be of non-zero length and contain numbers
# @param list array list
###
def list_max_norm(list):
# ensure we have something to iterate over
local length = get_list_length(list)
if length < 1:
popup("Getting the maximum of an empty list is impossible in list_max().", error = True, blocking = True)
textmsg("Getting the maximum of an empty list is impossible in list_max()")
halt
end
# search for maximum
local i = 1
local max = norm(list[0])
while i < length:
local tmp = norm(list[i])
if tmp > max:
max = tmp
end
i = i + 1
end
return max
end
thread trajectoryThread():
textmsg("Executing trajectory. Number of points: ", trajectory_points_left)
local is_first_point = True
local is_robot_moving = False
local INDEX_TIME = TRAJECTORY_DATA_DIMENSION
local INDEX_BLEND = INDEX_TIME + 1
# same index as blend parameter, depending on point type
local INDEX_SPLINE_TYPE = INDEX_BLEND
local INDEX_POINT_TYPE = INDEX_BLEND + 1
spline_qdd = [0, 0, 0, 0, 0, 0]
spline_qd = [0, 0, 0, 0, 0, 0]
enter_critical
trajectory_result = TRAJECTORY_RESULT_SUCCESS
while trajectory_result == TRAJECTORY_RESULT_SUCCESS and trajectory_points_left > 0:
local timeout = 0.5
if is_robot_moving:
timeout = get_steptime()
end
#reading trajectory point + blend radius + type of point (cartesian/joint based)
local raw_point = socket_read_binary_integer(TRAJECTORY_DATA_DIMENSION+1+1, "trajectory_socket", timeout)
trajectory_points_left = trajectory_points_left - 1
if raw_point[0] > 0:
local q = [ raw_point[1]/ MULT_jointstate, raw_point[2]/ MULT_jointstate, raw_point[3]/ MULT_jointstate, raw_point[4]/ MULT_jointstate, raw_point[5]/ MULT_jointstate, raw_point[6]/ MULT_jointstate]
local tmptime = raw_point[INDEX_TIME] / MULT_time
local blend_radius = raw_point[INDEX_BLEND] / MULT_time
local is_last_point = False
if trajectory_points_left == 0:
blend_radius = 0.0
is_last_point = True
end
# MoveJ point
if raw_point[INDEX_POINT_TYPE] == TRAJECTORY_POINT_JOINT:
movej(q, t=tmptime, r=blend_radius)
# reset old acceleration
spline_qdd = [0, 0, 0, 0, 0, 0]
spline_qd = [0, 0, 0, 0, 0, 0]
# Movel point
elif raw_point[INDEX_POINT_TYPE] == TRAJECTORY_POINT_CARTESIAN:
movel(p[q[0], q[1], q[2], q[3], q[4], q[5]], t=tmptime, r=blend_radius)
# reset old acceleration
spline_qdd = [0, 0, 0, 0, 0, 0]
spline_qd = [0, 0, 0, 0, 0, 0]
# Joint spline point
elif raw_point[INDEX_POINT_TYPE] == TRAJECTORY_POINT_JOINT_SPLINE:
# Cubic spline
if raw_point[INDEX_SPLINE_TYPE] == SPLINE_CUBIC:
qd = [ raw_point[7] / MULT_jointstate, raw_point[8] / MULT_jointstate, raw_point[9] / MULT_jointstate, raw_point[10] / MULT_jointstate, raw_point[11] / MULT_jointstate, raw_point[12] / MULT_jointstate]
is_robot_moving = cubicSplineRun(q, qd, tmptime, is_last_point, is_first_point)
# reset old acceleration
spline_qdd = [0, 0, 0, 0, 0, 0]
# Quintic spline
elif raw_point[INDEX_SPLINE_TYPE] == SPLINE_QUINTIC:
qd = [ raw_point[7] / MULT_jointstate, raw_point[8] / MULT_jointstate, raw_point[9] / MULT_jointstate, raw_point[10] / MULT_jointstate, raw_point[11] / MULT_jointstate, raw_point[12] / MULT_jointstate]
qdd = [ raw_point[13]/ MULT_jointstate, raw_point[14]/ MULT_jointstate, raw_point[15]/ MULT_jointstate, raw_point[16]/ MULT_jointstate, raw_point[17]/ MULT_jointstate, raw_point[18]/ MULT_jointstate]
is_robot_moving = quinticSplineRun(q, qd, qdd, tmptime, is_last_point, is_first_point)
else:
textmsg("Unknown spline type given:", raw_point[INDEX_POINT_TYPE])
clear_remaining_trajectory_points()
trajectory_result = TRAJECTORY_RESULT_FAILURE
end
end
else:
textmsg("Receiving trajectory point failed!")
trajectory_result = TRAJECTORY_RESULT_FAILURE
end
is_first_point = False
end
exit_critical
stopj(STOPJ_ACCELERATION)
socket_send_int(trajectory_result, "trajectory_socket")
textmsg("Trajectory finished with result ", trajectory_result_to_str(trajectory_result))
end
def clear_remaining_trajectory_points():
while trajectory_points_left > 0:
raw_point = socket_read_binary_integer(TRAJECTORY_DATA_DIMENSION+2, "trajectory_socket")
trajectory_points_left = trajectory_points_left - 1
end
end
# Helpers for speed control
def set_speedl(twist):
cmd_twist = twist
control_mode = MODE_SPEEDL
end
thread speedlThread():
textmsg("Starting speedl thread")
while control_mode == MODE_SPEEDL:
twist = cmd_twist
speedl(twist, 40.0, steptime)
end
textmsg("speedl thread ended")
stopj(STOPJ_ACCELERATION)
end
thread servoThreadP():
textmsg("Starting pose servo thread")
state = SERVO_IDLE
local q_last = get_target_joint_positions()
while control_mode == MODE_POSE:
enter_critical
q = cmd_servo_q
do_extrapolate = False
if (cmd_servo_state == SERVO_IDLE):
do_extrapolate = True
end
state = cmd_servo_state
if cmd_servo_state > SERVO_UNINITIALIZED:
cmd_servo_state = SERVO_IDLE
end
if do_extrapolate:
extrapolate_count = extrapolate_count + 1
if extrapolate_count > extrapolate_max_count:
extrapolate_max_count = extrapolate_count
end
q = extrapolate()
if targetWithinLimits(q_last, q, steptime):
servoj(q, t=steptime, {{SERVO_J_REPLACE}})
end
elif state == SERVO_RUNNING:
extrapolate_count = 0
if targetWithinLimits(q_last, q, steptime):
servoj(q, t=steptime, {{SERVO_J_REPLACE}})
end
else:
extrapolate_count = 0
sync()
end
exit_critical
end
textmsg("pose servo thread ended")
stopj(STOPJ_ACCELERATION)
end
def set_servo_pose(pose):
cmd_servo_state = SERVO_RUNNING
cmd_servo_q_last = cmd_servo_q
cmd_servo_q = get_inverse_kin(pose, cmd_servo_q)
end
def tool_contact_detection():
# Detect tool contact in the directions that the TCP is moving
step_back = tool_contact(direction = get_actual_tcp_speed())
# If tool contact is detected stop movement and move back to intial contact point
if step_back > 0:
if control_mode == MODE_FORWARD:
kill thread_trajectory
clear_remaining_trajectory_points()
elif control_mode == MODE_FREEDRIVE:
textmsg("Leaving freedrive mode")
end_freedrive_mode()
else:
kill thread_move
end
# Set control mode to tool in contact, should be cleared by stopping tool contact detection
control_mode = MODE_TOOL_IN_CONTACT
stopl(3)
# Move to initial contact point
q = get_actual_joint_positions_history(step_back)
movel(q)
socket_send_int(UNTIL_TOOL_CONTACT_RESULT_SUCCESS, "script_command_socket")
textmsg("tool contact detected")
end
end
# Thread to receive one shot script commands, the commands shouldn't be blocking
thread script_commands():
while control_mode > MODE_STOPPED:
raw_command = socket_read_binary_integer(SCRIPT_COMMAND_DATA_DIMENSION, "script_command_socket", 0)
if raw_command[0] > 0:
command = raw_command[1]
if command == ZERO_FTSENSOR:
zero_ftsensor()
elif command == SET_PAYLOAD:
mass = raw_command[2] / MULT_jointstate
cog = [raw_command[3] / MULT_jointstate, raw_command[4] / MULT_jointstate, raw_command[5] / MULT_jointstate]
set_payload(mass, cog)
elif command == SET_TOOL_VOLTAGE:
tool_voltage = raw_command[2] / MULT_jointstate
set_tool_voltage(tool_voltage)
elif command == START_FORCE_MODE:
task_frame = p[raw_command[2] / MULT_jointstate, raw_command[3] / MULT_jointstate, raw_command[4] / MULT_jointstate, raw_command[5] / MULT_jointstate, raw_command[6] / MULT_jointstate, raw_command[7] / MULT_jointstate]
selection_vector = [raw_command[8] / MULT_jointstate, raw_command[9] / MULT_jointstate, raw_command[10] / MULT_jointstate, raw_command[11] / MULT_jointstate, raw_command[12] / MULT_jointstate, raw_command[13] / MULT_jointstate]
wrench = [raw_command[14] / MULT_jointstate, raw_command[15] / MULT_jointstate, raw_command[16] / MULT_jointstate, raw_command[17] / MULT_jointstate, raw_command[18] / MULT_jointstate, raw_command[19] / MULT_jointstate]
force_type = raw_command[20] / MULT_jointstate
force_limits = [raw_command[21] / MULT_jointstate, raw_command[22] / MULT_jointstate, raw_command[23] / MULT_jointstate, raw_command[24] / MULT_jointstate, raw_command[25] / MULT_jointstate, raw_command[26] / MULT_jointstate]
force_mode(task_frame, selection_vector, wrench, force_type, force_limits)
elif command == END_FORCE_MODE:
end_force_mode()
elif command == START_TOOL_CONTACT:
tool_contact_running = True
elif command == END_TOOL_CONTACT:
if control_mode != MODE_TOOL_IN_CONTACT:
# If tool contact hasn't been detected send canceled result
socket_send_int(UNTIL_TOOL_CONTACT_RESULT_CANCELED, "script_command_socket")
end
tool_contact_running = False
end
end
end
end
# HEADER_END
# NODE_CONTROL_LOOP_BEGINS
force_mode_set_damping({{FORCE_MODE_SET_DAMPING_REPLACE}})
force_mode_set_gain_scaling({{FORCE_MODE_SET_GAIN_SCALING_REPLACE}})
socket_open("{{SERVER_IP_REPLACE}}", {{TRAJECTORY_SERVER_PORT_REPLACE}}, "trajectory_socket")
socket_open("{{SERVER_IP_REPLACE}}", {{SCRIPT_COMMAND_SERVER_PORT_REPLACE}}, "script_command_socket")
# This socket should be opened last as it tells the driver when it has control over the robot
socket_open("{{SERVER_IP_REPLACE}}", {{SERVER_PORT_REPLACE}}, "reverse_socket")
control_mode = MODE_UNINITIALIZED
thread_move = 0
thread_trajectory = 0
trajectory_points_left = 0
textmsg("ExternalControl: External control active")
global read_timeout = 0.0 # First read is blocking
thread_script_commands = run script_commands()
while control_mode > MODE_STOPPED:
enter_critical
params_mult = socket_read_binary_integer(REVERSE_INTERFACE_DATA_DIMENSION, "reverse_socket", read_timeout)
if params_mult[0] > 0:
# Convert read timeout from milliseconds to seconds
read_timeout = params_mult[1] / 1000.0
if control_mode != params_mult[REVERSE_INTERFACE_DATA_DIMENSION]:
# Clear remaining trajectory points
if control_mode == MODE_FORWARD:
kill thread_trajectory
clear_remaining_trajectory_points()
stopj(STOPJ_ACCELERATION)
socket_send_int(TRAJECTORY_RESULT_CANCELED, "trajectory_socket")
# Stop freedrive
elif control_mode == MODE_FREEDRIVE:
textmsg("Leaving freedrive mode")
end_freedrive_mode()
end
# If tool is in contact, tool contact should be ended before switching control mode
if control_mode == MODE_TOOL_IN_CONTACT:
if tool_contact_running == False:
control_mode = params_mult[REVERSE_INTERFACE_DATA_DIMENSION]
end
else:
control_mode = params_mult[REVERSE_INTERFACE_DATA_DIMENSION]
join thread_move
end
if control_mode == MODE_SERVOJ:
thread_move = run servoThread()
elif control_mode == MODE_SPEEDJ:
thread_move = run speedThread()
elif control_mode == MODE_FORWARD:
kill thread_move
stopj(STOPJ_ACCELERATION)
elif control_mode == MODE_SPEEDL:
thread_move = run speedlThread()
elif control_mode == MODE_POSE:
thread_move = run servoThreadP()
end
end
# Update the motion commands with new parameters
if control_mode == MODE_SERVOJ:
q = [params_mult[2]/ MULT_jointstate, params_mult[3]/ MULT_jointstate, params_mult[4]/ MULT_jointstate, params_mult[5]/ MULT_jointstate, params_mult[6]/ MULT_jointstate, params_mult[7]/ MULT_jointstate]
set_servo_setpoint(q)
elif control_mode == MODE_SPEEDJ:
qd = [params_mult[2]/ MULT_jointstate, params_mult[3]/ MULT_jointstate, params_mult[4]/ MULT_jointstate, params_mult[5]/ MULT_jointstate, params_mult[6]/ MULT_jointstate, params_mult[7]/ MULT_jointstate]
set_speed(qd)
elif control_mode == MODE_FORWARD:
if params_mult[2] == TRAJECTORY_MODE_RECEIVE:
kill thread_trajectory
clear_remaining_trajectory_points()
trajectory_points_left = params_mult[3]
thread_trajectory = run trajectoryThread()
elif params_mult[2] == TRAJECTORY_MODE_CANCEL:
textmsg("cancel received")
kill thread_trajectory
clear_remaining_trajectory_points()
stopj(STOPJ_ACCELERATION)
socket_send_int(TRAJECTORY_RESULT_CANCELED, "trajectory_socket")
end
elif control_mode == MODE_SPEEDL:
twist = [params_mult[2] / MULT_jointstate, params_mult[3] / MULT_jointstate, params_mult[4] / MULT_jointstate, params_mult[5] / MULT_jointstate, params_mult[6] / MULT_jointstate, params_mult[7] / MULT_jointstate]
set_speedl(twist)
elif control_mode == MODE_POSE:
pose = p[params_mult[2] / MULT_jointstate, params_mult[3] / MULT_jointstate, params_mult[4] / MULT_jointstate, params_mult[5] / MULT_jointstate, params_mult[6] / MULT_jointstate, params_mult[7] / MULT_jointstate]
set_servo_pose(pose)
elif control_mode == MODE_FREEDRIVE:
if params_mult[2] == FREEDRIVE_MODE_START:
textmsg("Entering freedrive mode")
freedrive_mode()
elif params_mult[2] == FREEDRIVE_MODE_STOP:
textmsg("Leaving freedrive mode")
end_freedrive_mode()
end
end
# Tool contact is running, but hasn't been detected
if tool_contact_running == True and control_mode != MODE_TOOL_IN_CONTACT:
tool_contact_detection()
end
else:
textmsg("Socket timed out waiting for command on reverse_socket. The script will exit now.")
control_mode = MODE_STOPPED
end
exit_critical
end
textmsg("ExternalControl: Stopping communication and control")
kill thread_move
kill thread_trajectory
kill thread_script_commands
stopj(STOPJ_ACCELERATION)
textmsg("ExternalControl: All threads ended")
socket_close("reverse_socket")
socket_close("trajectory_socket")
socket_close("script_command_socket")
# NODE_CONTROL_LOOP_ENDS