Skip to content

Commit

Permalink
Updated code working with reverse flow
Browse files Browse the repository at this point in the history
  • Loading branch information
rraks committed Oct 21, 2019
1 parent e54b7ea commit 68de8a5
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 77 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ tmux/
*.log
*.pyc
.fuse*
*.rdb
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Distributed Video-IoT framework to support modern demands for high fidelity, hig
3. Bring up the two containers for the main server (lb) and the database(db). \
` docker-compose up lb db`
4. Exec into the lb container and run the server, either as a daemon or on a tmux session. \
` docker exec -it vidiot_lb_l` \
` docker exec -it vid-iot_lb_l` \
` cd vidiot ` \
` ./scripts/start_all_tmux.sh`
5. The video server should be running and can be attached to by issuing \
Expand Down
8 changes: 4 additions & 4 deletions scripts/streams.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[
{
"id": "iisc_108",
"ip": "rtsp://10.18.20.108/h264"
"stream_id": "iisc_108",
"stream_ip": "rtsp://10.18.20.108/h264"
},
{
"id": "iisc_107",
"ip": "rtsp://10.18.20.107/h264"
"stream_id": "iisc_107",
"stream_ip": "rtsp://10.18.20.107/h264"
}
]
23 changes: 11 additions & 12 deletions src/HTTPserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def userfunc():
ret = monitorTaskResult(res)
if ret["topic"] == "lbsresponse/user/del" and ret["msg"] is True:
return Response(json.dumps({"username": user_name}),
status=204, mimetype="application/json")
status=200, mimetype="application/json")
else:
return Response(json.dumps({}), status=404,
mimetype="application/json")
Expand Down Expand Up @@ -249,13 +249,13 @@ def stream():
If failed - 404 {}
'''
data = request.get_json(force=True)
stream_ip = data["stream_ip"]
stream_id = data["stream_id"]
logger.info("Deleted Stream " + str(stream_id))
''' TODO: make schema changes in accordance to schema '''
msg = {"stream_id": stream_id}
res = lbc.DeleteStream.delay(json.dumps(msg))
ret = monitorTaskResult(res)
print(ret)

if isinstance(ret, list):
m = [d for d in ret if d["topic"] == "lbsresponse/stream/del"][0]
Expand Down Expand Up @@ -321,14 +321,12 @@ def origin():
"num_clients": 0}
res = lbc.InsertOrigin.delay(json.dumps(msg))
ret = monitorTaskResult(res)
if ret["topic"] == "lbsresponse/origin/add":
msg = ret["msg"]
if msg:
return Response(json.dumps({}),
status=200, mimetype="application/json")
else:
return Response(json.dumps({}), status=409,
mimetype="application/json")
if ret["topic"] == "lbsresponse/origin/add" and ret["msg"] is True:
return Response(json.dumps({}),
status=200, mimetype="application/json")
else:
return Response(json.dumps({}), status=409,
mimetype="application/json")

elif request.method == "DELETE":
'''
Expand Down Expand Up @@ -419,11 +417,12 @@ def dist():
If failed - 404 {}
'''
data = request.get_json(force=True)
dist_id = data["id"]
logger.info("Deleted Distribution "+str(dist_id))
dist_id = data["dist_id"]
logger.info("Deleted Distribution " + str(dist_id))
msg = {"dist_id": dist_id}
res = lbc.DeleteDist.delay(json.dumps(msg))
ret = monitorTaskResult(res)
print(ret)
if isinstance(ret, list):
m = [d for d in ret if d["topic"] == "lbsresponse/dist/del"][0]
if(m["msg"] is True):
Expand Down
15 changes: 9 additions & 6 deletions src/loadbalancercelery.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ def findAll(self, doc=None, args=None):
return list(self.collection.find({}, arg))

def delete(self, doc):
self.collection.delete_one(doc)
res = self.collection.delete_one(doc)
return res.deleted_count

def deleteMany(self, doc):
self.collection.delete_many(doc)
res = self.collection.delete_many(doc)
return res.deleted_count

def count(self):
return self.collection.count()
Expand Down Expand Up @@ -193,7 +195,7 @@ def DeleteOrigin(msg):
streamsTable.deleteMany({"from_id": msg["origin_id"]})
logger.info("Origin Deleted----> ID:" + " ID:"+str(msg["origin_id"]))
return [{"topic": "lbsresponse/origin/del", "msg": True},
{"topic": "origin/ffmpeg/killall", "msg": msg}]
{"topic": "origin/ffmpeg/killall", "msg": json.dumps(msg)}]
else:
return {"topic": "lbsresponse/origin/del", "msg": False}

Expand Down Expand Up @@ -398,6 +400,7 @@ def InsertStream(msg):
for origin in origins:
if (origin["num_clients"] < bestNumClients):
bestOrigin = origin
bestNumClients = origin["num_clients"]
origin = bestOrigin

''' TODO: Add 'status' to the streamsTable '''
Expand Down Expand Up @@ -451,12 +454,12 @@ def DeleteStream(msg):
msg = json.loads(msg)
killlist = []
streams = streamsTable.findAll(msg)
logger.info("Deleting " + msg["stream_id"], " from", )
logger.info("Deleting " + msg["stream_id"])
if len(streams) is 0:
logger.info("Stream " + msg["stream_id"], " not found")
logger.info("Stream " + msg["stream_id"] + " not found")
return {"topic": "lbsresponse/stream/del", "msg": False}
else:
killlist = ffmpegProcsTable.findAll(msg)
killlist = json.dumps(ffmpegProcsTable.findAll(msg))
streamsTable.delete(msg)
ffmpegProcsTable.deleteMany(msg)
return [{"topic": "lbsresponse/stream/del", "msg": True},
Expand Down
162 changes: 108 additions & 54 deletions tests/testSuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def __init__(self, HTTP_IP, HTTP_PORT, ROOT_UNAME, ROOT_PASSWD):
uname = os.environ["MONGO_INITDB_ROOT_USERNAME"]
pwd = os.environ["MONGO_INITDB_ROOT_PASSWORD"]
url = os.environ["MONGO_URL"]
mongoclient = pymongo.MongoClient("mongodb://" + uname + ":" + pwd +
"@" + url + ":27017/", connect=False)
mongoclient.drop_database("vid-iot")
self.mongoclient = pymongo.MongoClient("mongodb://" + uname + ":" + pwd +
"@" + url + ":27017/", connect=False)
self.mongoclient.drop_database("vid-iot")

def createUser(self, user_cred=None):
''' Root functions '''
Expand All @@ -35,86 +35,77 @@ def createUser(self, user_cred=None):
resp = requests.post(reqLink, data=json.dumps(user_cred))
else:
resp = requests.post(reqLink, data=json.dumps(self.test_cred))
print(resp.status_code)
return resp.status_code
return resp

def deleteUser(self, user_cred=None):
reqLink = self.root_link + "/user"
if(user_cred is not None):
resp = requests.delete(reqLink, data=json.dumps(user_cred))
else:
resp = requests.delete(reqLink, data=json.dumps(self.test_cred))
print(resp.status_code)
return resp.status_code
return resp

def allUsers(self):
reqLink = self.root_link + "/user"
resp = requests.get(reqLink)
print(resp.json())
return len(resp.json())
return resp

def addOrigin(self, origin_id, origin_ip):
reqLink = self.root_link + "/origin"
d = json.dumps({"origin_id": origin_id, "origin_ip": origin_ip})
resp = requests.post(reqLink, data=d, headers=self.test_cred)
print(resp.status_code)
return resp.status_code
return resp

def deleteOrigin(self, origin_id, origin_ip):
def deleteOrigin(self, origin_id):
reqLink = self.root_link + "/origin"
d = json.dumps({"origin_id": origin_id})
resp = requests.delete(reqLink, data=d, headers=self.test_cred)
return resp.status_code
return resp

def allOrigin(self):
reqLink = self.root_link + "/origin"
resp = requests.get(reqLink, headers=self.test_cred)
print(resp.json())
return len(resp.json())
return resp

def addDist(self, dist_id, dist_ip):
reqLink = self.root_link + "/dist"
d = json.dumps({"dist_id": dist_id, "dist_ip": dist_ip})
resp = requests.post(reqLink, data=d, headers=self.test_cred)
return resp.status_code
return resp

def deleteDist(self, dist_id, dist_ip):
def deleteDist(self, dist_id):
reqLink = self.root_link + "/dist"
d = json.dumps({"dist_id": dist_id})
resp = requests.delete(reqLink, data=d, headers=self.test_cred)
return resp.status_code
return resp

def allDist(self):
reqLink = self.root_link + "/dist"
resp = requests.get(reqLink, headers=self.test_cred)
print(resp.json())
return resp.status_code
return resp

def addStream(self, stream_id, stream_ip):
reqLink = self.root_link + "/streams"
d = json.dumps({"stream_id": stream_id, "stream_ip": stream_ip})
resp = requests.post(reqLink, data=d, headers=self.test_cred)
print(resp.json())
return resp.status_code
return resp

def deleteStream(self, stream_id, stream_ip):
def deleteStream(self, stream_id):
reqLink = self.root_link + "/streams"
d = json.dumps({"stream_id": stream_id, "stream_ip": stream_ip})
d = json.dumps({"stream_id": stream_id})
resp = requests.delete(reqLink, data=d, headers=self.test_cred)
return resp.status_code
return resp

def allStreams(self):
reqLink = self.root_link + "/streams"
resp = requests.get(reqLink, headers=self.test_cred)
print(resp.json())
return resp.status_code
return resp

def reqStream(self, stream_id):
reqLink = self.root_link + "/request"
d = json.dumps({"stream_id": stream_id})
resp = requests.post(reqLink, data=d, headers=self.test_cred)
print(resp.json())
return resp.status_code
return resp


class VidTest(unittest.TestCase):
Expand Down Expand Up @@ -147,7 +138,6 @@ def __init__(self, *args, **kwargs):
self.ROOT_uname, self.ROOT_passwd)

def test_user(self):
print("Testing User creation functions ")
vec = [{"username": u, "password": u} for u in ["test1", "test2", "test3"]]
succ_create = 0.
succ_get = 0.
Expand All @@ -163,35 +153,99 @@ def test_user(self):
for v in vec:
if self.vs.deleteUser(v) == 204:
succ_delete += 1
print("Success of creation", succ_create/num)
print("Success of Get", succ_get/num)
print("Success of delete", succ_delete/num)

def test_simpleFlow(self):
print("Creating user")
self.vs.createUser()
print("User Created")
print("Showing all Users")
self.vs.allUsers()
print("Adding Origin")
self.vs.addOrigin(self.origin_id, self.origin_ip)
print("Showing all origins")
self.vs.allOrigin()
print("Adding dist")
self.vs.addDist(self.dist_id, self.dist_ip)
print("Showing all dists")
self.vs.allDist()
'''
Forward Pass
'''

''' Create user '''
resp = self.vs.createUser()
self.assertEqual(resp.status_code, 200)
''' Show all users (only 1) '''
resp = self.vs.allUsers()
self.assertEqual(len(resp.json()), 1)
self.assertEqual(resp.json()[0],
{"username": self.vs.test_cred["username"]})
self.assertEqual(resp.status_code, 200)
''' Add origin server '''
resp = self.vs.addOrigin(self.origin_id, self.origin_ip)
self.assertEqual(resp.status_code, 200)
''' Show all origin servers '''
resp = self.vs.allOrigin()
self.assertEqual(resp.json()[0]["origin_ip"], self.origin_ip)
self.assertEqual(resp.json()[0]["origin_id"], self.origin_id)
''' Add a dist server '''
resp = self.vs.addDist(self.dist_id, self.dist_ip)
self.assertEqual(resp.status_code, 200)
''' Show all dists servers '''
resp = self.vs.allDist()
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()[0]["dist_ip"], self.dist_ip)
self.assertEqual(resp.json()[0]["dist_id"], self.dist_id)
''' Add streams '''
for stream in self.streams:
print("Adding stream", stream["id"])
self.vs.addStream(stream["id"], stream["ip"])
self.vs.allStreams()
resp = self.vs.addStream(stream["stream_id"], stream["stream_ip"])
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["info"], "inserting")
''' Show all streams '''
resp = self.vs.allStreams()
stream_ids = [s["stream_id"] for s in self.streams]
resp_streams = [s["stream_id"] for s in resp.json()]
self.assertEqual(resp.status_code, 200)
for stream in resp_streams:
self.assertTrue(stream in stream_ids)
''' Request for stream '''
for stream in self.streams:
print("Requesting stream", stream)
self.vs.reqStream(stream["id"])
print("Re-requesting")
resp = self.vs.reqStream(stream["stream_id"])
self.assertEqual(resp.status_code, 200)
resp_json = resp.json()
self.assertFalse(resp_json["info"] == "unavailable")
''' Optional second request '''
# for stream in self.streams:
# self.vs.reqStream(stream["stream_id"])

'''
Reverse Pass
'''

''' Delete all streams '''
for stream in self.streams:
print("Requesting stream", stream)
self.vs.reqStream(stream["id"])
resp = self.vs.deleteStream(stream["stream_id"])
self.assertEqual(resp.status_code, 200)
resp_json = resp.json()
''' Show all streams '''
resp = self.vs.allStreams()
stream_ids = [s["stream_id"] for s in self.streams]
resp_streams = [s["stream_id"] for s in resp.json()]
self.assertEqual(resp.status_code, 200)
for stream in resp_streams:
self.assertFalse(stream in stream_ids)
''' Delete dist server '''
resp = self.vs.deleteDist(self.dist_id)
self.assertEqual(resp.status_code, 200)
''' Show all dists servers '''
resp = self.vs.allDist()
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json(), [])
''' Delete origin server '''
resp = self.vs.deleteOrigin(self.origin_id)
self.assertEqual(resp.status_code, 200)
''' Show all origin servers '''
resp = self.vs.allOrigin()
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json(), [])
''' Delete user '''
resp = self.vs.deleteUser()
self.assertEqual(resp.status_code, 200)
''' Show all users (0) '''
resp = self.vs.allUsers()
self.assertEqual(len(resp.json()), 0)
self.assertEqual(resp.status_code, 200)

def tearDown(self):
''' Cleanup '''
self.vs.mongoclient.drop_database("vid-iot")


if __name__ == '__main__':
Expand Down

0 comments on commit 68de8a5

Please sign in to comment.