Skip to content

Commit

Permalink
fix: copy and paste mistakes
Browse files Browse the repository at this point in the history
  • Loading branch information
kmherrmann committed Aug 10, 2023
1 parent fc3249a commit 28440d8
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 0 deletions.
116 changes: 116 additions & 0 deletions ory-actions/vpncheck-py/ipqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright © 2023 Ory Corp
# SPDX-License-Identifier: Apache-2.0

from flask import Flask, request, jsonify
import requests

import os

# load google cloud logging if running on GCP
if os.getenv('ENABLE_CLOUD_LOGGING', ''):
# set up the Google Cloud Logging python client library
import google.cloud.logging
client = google.cloud.logging.Client()
client.setup_logging()

# use Python’s standard logging library to send logs to GCP
import logging

app = Flask(__name__)

# Define the bearer token for authentication
BEARER_TOKEN = os.environ.get("BEARER_TOKEN")
IPQS_API_KEY = os.environ.get("IPQS_API_KEY")

if not BEARER_TOKEN or not IPQS_API_KEY:
raise ValueError("BEARER_TOKEN or IPQS_API_KEY not set in environment variables.")

@app.route("/vpncheck", methods=["POST"])
def handle_vpncheck():
return vpncheck(request)

def vpncheck(request):
# Check for bearer token authentication
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "Unauthorized"}), 401

provided_token = auth_header.split("Bearer ")[1]
if provided_token != BEARER_TOKEN:
return jsonify({"error": "Unauthorized"}), 401

# Parse the JSON payload and extract the IP address
data = request.get_json()
logging.info(f"request: {data}")
ip_address = data.get("ip_address")
if not ip_address:
return error_response("Cannot determine Client IP address")

# Call vpnapi.io to check the IP address
# if the API fails, we permit by default
try:
vpn_result = query_ipqs(ip_address)
except Exception as e:
return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200

# Check the response from IPQS - based on example from https://www.ipqualityscore.com/documentation/proxy-detection/overview

if not 'success' in vpn_result or vpn_result['success'] == False:
logging.info(f"IPQS failed, result: {vpn_result}")
return error_response("Can't verify IP address")

if "fraud_score" in vpn_result and vpn_result["fraud_score"] >= 80:
logging.info(f"Blocked: Fraud score {vpn_result['fraud_score']}")
return error_response("Authentication failed: IP address cannot be verified")
if "vpn" in vpn_result and vpn_result["vpn"] == True:
logging.info(f"Blocked: VPN")
return error_response("Authentication failed: Please disable your VPN.")
if "tor" in vpn_result and vpn_result["tor"] == True:
logging.info(f"Blocked: Tor")
return error_response("Authentication failed: Please disable Tor.")

if (
"iso_code" in vpn_result
and vpn_result["iso_code"] == "ru"
):
logging.info(f"geoblock: {vpn_result['iso_code']}")
return error_response("Request blocked: Geolocation")

# Return the result as success or error details
return jsonify(vpn_result), 200


def error_response(msg):
return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400


def query_ipqs(ip_address):
# Implement the logic to call ipqs.com and retrieve the result
# You can use libraries like requests or httpx for making HTTP requests
# Return the response as a dictionary
# For example:

ipqs_parameters = {
#'user_agent' : header_items['User-Agent'],
#'user_language' : header_items['Accept-Language'].split(',')[0],
'strictness' : 1,
# You may want to allow public access points like coffee shops, schools, corporations, etc...
'allow_public_access_points' : 'true',
# Reduce scoring penalties for mixed quality IP addresses shared by good and bad users.
'lighter_penalties' : 'false'
}


url = f"https://www.ipqualityscore.com/api/json/ip/{IPQS_API_KEY}/{ip_address}"
response = requests.get(url, params = ipqs_parameters, timeout = 1.5)


if response.status_code != 200:
raise Exception(f"IPQS returned {response.status_code}")

result = response.json()
return result


if __name__ == "__main__":
app.run()
102 changes: 102 additions & 0 deletions ory-actions/vpncheck-py/vpnapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright © 2023 Ory Corp
# SPDX-License-Identifier: Apache-2.0

from flask import Flask, request, jsonify
import requests

import os

# load google cloud logging if running on GCP
if os.getenv('ENABLE_CLOUD_LOGGING', ''):
# set up the Google Cloud Logging python client library
import google.cloud.logging
client = google.cloud.logging.Client()
client.setup_logging()

# use Python’s standard logging library to send logs to GCP
import logging

app = Flask(__name__)

# Define the bearer token for authentication
BEARER_TOKEN = os.environ.get("BEARER_TOKEN")
VPNAPIIO_API_KEY = os.environ.get("VPNAPIIO_API_KEY")

if not BEARER_TOKEN or not VPNAPIIO_API_KEY:
raise ValueError("BEARER_TOKEN or VPNAPIIO_API_KEY not set in environment variables.")

@app.route("/vpncheck", methods=["POST"])
def handle_vpncheck():
return vpncheck(request)

def vpncheck(request):
# Check for bearer token authentication
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "Unauthorized"}), 401

provided_token = auth_header.split("Bearer ")[1]
if provided_token != BEARER_TOKEN:
return jsonify({"error": "Unauthorized"}), 401

# Parse the JSON payload and extract the IP address
data = request.get_json()
logging.info(f"request: {data}")
ip_address = data.get("ip_address")
if not ip_address:
return error_response("Cannot determine Client IP address")

# Call vpnapi.io to check the IP address
# if the API fails, we permit by default
try:
vpn_result = query_vpn_io(ip_address)
except Exception as e:
return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200

# Check the response from vpnapi.io
if "error" in vpn_result and vpn_result["error"] == "Blocked":
return error_response("Request blocked: Blocked by VPN API")

if "security" in vpn_result:
security_info = vpn_result["security"]
if "vpn" in security_info and security_info["vpn"] == True:
logging.info(f"vpn block: {security_info['vpn']}")
return error_response("Request blocked: VPN")
if "tor" in security_info and security_info["tor"] == True:
logging.info(f"tor block: {security_info['tor']}")
return error_response("Request blocked: Tor")

if (
"location" in vpn_result
and "country_code" in vpn_result["location"]
and vpn_result["location"]["country_code"] == "RU"
):
logging.info(f"geoblock: {vpn_result['location']['country_code']}")
return error_response("Request blocked: Geolocation")

# Return the result as success or error details
return jsonify(vpn_result), 200


def error_response(msg):
return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400


def query_vpn_io(ip_address):
# Implement the logic to call vpnapi.io and retrieve the result
# You can use libraries like requests or httpx for making HTTP requests
# Return the response as a dictionary
# For example:

url = f"https://vpnapi.io/api/{ip_address}?key={VPNAPIIO_API_KEY}"
response = requests.get(url, timeout=1.5)
if response.status_code != 200:
raise Exception(f"vpnapi.io returned {response.status_code}")

result = response.json()

return result


if __name__ == "__main__":
app.run()

0 comments on commit 28440d8

Please sign in to comment.