Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSL support for Fritzbox communication #3

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions Example/PyDect200_Demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python
# -- coding: utf-8 --

from __future__ import (absolute_import, division,
print_function, unicode_literals)

Expand All @@ -16,10 +17,17 @@
except:
PyDect200 = PyDect200.PyDect200

print(u"Welcome to PyDect200 v%s, the Python AVM-DECT200 API" % PyDect200.__version__)
fritzbox_user = getpass.getpass(prompt='Please insert your fritzbox-username: ', stream=None)
fritzbox_pw = getpass.getpass(prompt='Please insert your fritzbox-password: ', stream=None)
print(u'Thank you, please wait few seconds...')
f = PyDect200(fritzbox_pw)

print(u"Welcome to PyDect200 v%s, the Python AVM-DECT200 API" % PyDect200.__version__)

# you can also use the .crt exported from your Browser llike FireFox
# Firfox: -> go to login site of your FritBox
# Right Click->Siteinfromation->Security->Show Certificate->Export
# Save it
f = PyDect200.PyDect200(fritzbox_pw, fritzbox_user, "/home/fritz.pem","https://fritz.box")
f.get_sid() # SID will be timedout after 1h you have to fetch an new one after timeout
try:
info = f.get_info()
power = f.get_power_all()
Expand All @@ -37,12 +45,12 @@
except:
print(u"Device Name: %s" % dev_name.encode('utf-8').decode('utf-8', 'ignore'))


print(u"Device State: %s" % ('ON' if info.get(dev_id) == '1' else 'OFF'))
dev_power = power.get(dev_id)
if dev_power.isdigit():
dev_power = float(dev_power) / 1000
print(u"Device Power: %sW" % dev_power)
print(u"Device Energy: %sWh" % f.get_energy_single(dev_id))
print(u"Device Temperature: %s degree Celsius " % (f.get_temperature_single(dev_id)))
print(u'')

f.logout()
62 changes: 41 additions & 21 deletions PyDect200/PyDect200.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from __future__ import (absolute_import, division,
print_function, unicode_literals)
import hashlib, sys
import hashlib, sys, ssl
try:
import urllib.request as urllib2
except ImportError:
Expand All @@ -21,21 +21,32 @@ class PyDect200(object):
__author_email__ = u'[email protected]'
__description__ = u'Control Fritz AVM DECT200'

__fritz_url = u'http://fritz.box'
__fritz_url = u'https://fritz.box'
__homeswitch = u'/webservices/homeautoswitch.lua'

__debug = False

def __init__(self, fritz_password):
def __init__(self, fritz_password,username, cert=None,fritz_url=None):
"""The constructor"""
self.__password = fritz_password
self.get_sid()

self.__username = username
self.__fritz_url = fritz_url
if not fritz_url:
self.__fritz_url = u'https://fritz.box'

if not cert:
self.__context = ssl._create_unverified_context()
print("Warning SSL certificate unverified")
else:
self.__context = ssl.create_default_context(cafile=cert)
self.__context.verify_mode = ssl.CERT_REQUIRED
self.__context.check_hostname = True
print("SSL activ")

def set_url(self, url):
def set_url(self, url, ssl_conext):
"""Set alternative url"""
self.__fritz_url = url

self.__context = context

def __homeauto_url_with_sid(self):
"""Returns formatted uri"""
Expand All @@ -44,10 +55,11 @@ def __homeauto_url_with_sid(self):
self.sid)

@classmethod
def __query(cls, url):
def __query(cls, url,context):
"""Reads a URL"""

try:
return urllib2.urlopen(url).read().decode('utf-8').replace('\n', '')
return urllib2.urlopen(url,context=context).read().decode('ascii').replace('\n', '')
except urllib2.HTTPError:
_, exception, _ = sys.exc_info()
if cls.__debug:
Expand All @@ -64,22 +76,21 @@ def __query(cls, url):
pass
return "inval"



def __query_cmd(self, command, device=None):
"""Calls a command"""
url = u'%s&switchcmd=%s' % (self.__homeauto_url_with_sid(), command)
if device is None:
return self.__query(url)
res = self.__query(url,self.__context)
return res
else:
return self.__query('%s&ain=%s' % (url, device))
return self.__query('%s&ain=%s' % (url, device),self.__context)

def get_sid(self):
"""Returns a valid SID"""
base_url = u'%s/login_sid.lua' % self.__fritz_url
get_challenge = None
try:
get_challenge = urllib2.urlopen(base_url).read().decode('ascii')
get_challenge = urllib2.urlopen(base_url,context=self.__context).read().decode('ascii')
except urllib2.HTTPError as exception:
print('HTTPError = ' + str(exception.code))
except urllib2.URLError as exception:
Expand All @@ -88,19 +99,25 @@ def get_sid(self):
print('generic exception: ' + str(exception))
raise


challenge = get_challenge.split(
'<Challenge>')[1].split('</Challenge>')[0]
challenge_b = (
challenge + '-' + self.__password).encode().decode('iso-8859-1').encode('utf-16le')
challenge = get_challenge.split('<Challenge>')[1].split('</Challenge>')[0]
challenge_b = (challenge + '-' + self.__password).encode().decode('iso-8859-1').encode('utf-16le')

md5hash = hashlib.md5()
md5hash.update(challenge_b)

response_b = challenge + '-' + md5hash.hexdigest().lower()
get_sid = urllib2.urlopen('%s?response=%s' % (base_url, response_b)).read().decode('utf-8')
get_sid = urllib2.urlopen('%s?username=%s&response=%s' % (base_url, self.__username,response_b),context=self.__context).read().decode('utf-8')

self.sid = get_sid.split('<SID>')[1].split('</SID>')[0]


def logout(self):
url = u"%s&logout=1" % self.__homeauto_url_with_sid()
try:
urllib2.urlopen(url,context=self.__context)
except:
pass


def get_info(self):
"""Returns device info"""
return self.get_state_all()
Expand Down Expand Up @@ -187,3 +204,6 @@ def get_state_all(self):
for device in self.get_device_names().keys():
state_dict[device] = self.get_state(device)
return state_dict

def __del__(self):
self.logout()
59 changes: 14 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,28 @@
PyDect200
======

[![Download format](http://img.shields.io/pypi/format/PyDect200.svg)](https://pypi.python.org/pypi/PY_DECT200/)
[![Downloads](http://img.shields.io/pypi/dm/PyDect200.svg)](https://pypi.python.org/pypi/PY_DECT200/)
[![License](http://img.shields.io/pypi/l/PyDect200.svg)](https://pypi.python.org/pypi/PY_DECT200/)
[![Latest Version](http://img.shields.io/pypi/v/PyDect200.svg)](https://pypi.python.org/pypi/PY_DECT200/)


Control the Fritz-AVM DECT200 (switch a electric socket)
and Fritz-AVM PowerLine 546E

### Install

```
pip install PyDect200
```

### Demo

```
git clone [email protected]:mperlet/PyDect200.git

./PyDect200/Example/PyDect200_Demo.py
```

### Example Code

```
In [1]: from PyDect200 import PyDect200
In [2]: f = PyDect200('fitzbox_password')
In [3]: f.get_device_names()
Out[3]: {'16': 'Beleuchtung', '17': 'Fernseher'}
In [4]: f.get_info()
Out[4]: {u'16': u'0', u'17': u'0'}
In [5]: f.switch_onoff(16,1)
Out[5]:
In [2]: f = PyDect200.PyDect200(fritzbox_pw, fritzbox_user, "/home/fritz.pem","https://fritz.box")
In [3]: f.get_sid() # get the sid will you get a new login
In [4]: f.get_device_names()
Out[4]: {'16': 'Beleuchtung', '17': 'Fernseher'}
In [5]: f.get_info()
Out[5]: {u'16': u'0', u'17': u'0'}
In [6]: f.switch_onoff(16,1)
Out[6]:
{u'DeviceID': u'16',
u'RequestResult': u'1',
u'Value': u'0',
u'ValueToSet': u'1'}
In [6]: f.get_power()
Out[6]: {u'16': 68.95, u'17': 0.0}
In [7]: f.get_power()
Out[7]: {u'16': 68.95, u'17': 0.0}
In [8]: f.logout()
```

### Tested with

* Python2.7 / Python3.4
* Fritzbox 7270
* FRITZ!OS: 06.05
* Python3.5
* Fritzbox 7390
* FRITZ!OS: 06.51
* AVM Dect200

******************

* Python2.7
* Fritzbox 7490
* FRITZ!OS: 6.36 Labor
* Dect200
* PowerLine 546E