forked from d5000/acme-minitools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpchost.py
48 lines (43 loc) · 1.98 KB
/
rpchost.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#! /usr/bin/env python3
"""Originally written for the ACME project as part of the script blocknotifybase.py.
(c) Graham Higgins, The Slimcoin Developers
License: MIT
"""
try:
import simplejson as json # needed for Decimal conversions, test if it breaks something
except ImportError:
import json
import requests
import time
class RPCHost(object):
"""Class defining a 'host' for remote procedure calls."""
def __init__(self, url):
self._session = requests.Session()
self._url = url
self._headers = {'content-type': 'application/json'}
def call(self, rpcMethod, *params):
"""RPC call using the Bitcoin JSON RPC API."""
payload = json.dumps({"method": rpcMethod, "params": list(params), "jsonrpc": "2.0"})
tries = 10
hadConnectionFailures = False
while True:
# print("{url} {headers} {data}".format(url=self._url, headers=self._headers, data=payload))
try:
response = self._session.get(self._url, headers=self._headers, data=payload)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadConnectionFailures = True
print("Couldn't connect for remote procedure call, will sleep for ten seconds and then try again ({} more tries)".format(tries))
time.sleep(10)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if response.status_code not in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] is not None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']