forked from manhinhang/ib-gateway-docker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_ib_gateway.py
52 lines (46 loc) · 1.53 KB
/
test_ib_gateway.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import pytest
import subprocess
import testinfra
import os
IMAGE_NAME = os.environ['IMAGE_NAME']
# scope='session' uses the same container for all the tests;
# scope='function' uses a new container per test function.
@pytest.fixture(scope='session')
def host(request):
account = os.environ['IB_ACCOUNT']
password = os.environ['IB_PASSWORD']
trade_mode = os.environ['TRADE_MODE']
# run a container
docker_id = subprocess.check_output(
['docker', 'run',
'--env', 'IB_ACCOUNT={}'.format(account),
'--env', 'IB_PASSWORD={}'.format(password),
'--env', 'TRADE_MODE={}'.format(trade_mode),
'-p', '4002:4002',
'-d', IMAGE_NAME,
"tail", "-f", "/dev/null"]).decode().strip()
# return a testinfra connection to the container
yield testinfra.get_host("docker://" + docker_id)
# at the end of the test suite, destroy the container
subprocess.check_call(['docker', 'rm', '-f', docker_id])
def test_ibgateway_version(host):
int(host.run("ls /root/Jts/ibgateway").stdout)
def test_ib_connect(host):
script = """
from ib_insync import *
from concurrent.futures import TimeoutError
ib = IB()
wait = 60
while not ib.isConnected():
try:
IB.sleep(1)
ib.connect('localhost', 4002, clientId=998)
except (ConnectionRefusedError, OSError, TimeoutError):
pass
wait -= 1
if wait <= 0:
break
ib.disconnect()
"""
cmd = host.run("python -c \"{}\"".format(script))
assert cmd.rc == 0