Skip to content

Commit

Permalink
Add logviewer as built-in plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
raidensakura committed Jun 2, 2023
1 parent 884c845 commit a637ef4
Show file tree
Hide file tree
Showing 36 changed files with 11,947 additions and 1 deletion.
5 changes: 4 additions & 1 deletion plugins/@local/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# https://www.howtogeek.com/devops/how-to-set-up-gitignore-as-a-whitelist/
*
!.gitignore
!*/
!.gitignore
!logviewer/**
150 changes: 150 additions & 0 deletions plugins/@local/logviewer/core/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from functools import wraps

import aiohttp
from aiohttp_session import get_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
import os

from urllib.parse import urlencode

OAUTH2_CLIENT_ID = os.getenv("OAUTH2_CLIENT_ID")
OAUTH2_CLIENT_SECRET = os.getenv("OAUTH2_CLIENT_SECRET")
OAUTH2_REDIRECT_URI = os.getenv("OAUTH2_REDIRECT_URI")

API_BASE = "https://discordapp.com/api/"
AUTHORIZATION_BASE_URL = f"{API_BASE}/oauth2/authorize"
TOKEN_URL = f"{API_BASE}/oauth2/token"
ROLE_URL = f"{API_BASE}/guilds/{{guild_id}}/members/{{user_id}}"

from core.models import getLogger

logger = getLogger(__name__)

client_session = aiohttp.ClientSession()


def authentication(func):
async def wrapper(self, request, **kwargs):
if not self.config.using_oauth:
result = await func(self, request, **kwargs)
return result

session = await get_session(request)
if not session.get("user"):
session["last_visit"] = str(request.url)
raise aiohttp.web.HTTPFound("/login")

user = session.get("user")

whitelist = self.bot.config.get("oauth_whitelist", [])

roles = await get_user_roles(user["id"])

if (
int(user["id"]) in whitelist or
"everyone" in whitelist or
any(int(r) in whitelist for r in roles)
):
result = await func(self, request, **kwargs)
return result

result = await self.render_template("unauthorized", request)

return result
return wrapper


def authrequired():
def decorator(func):
@wraps(func)
async def wrapper(self, request):
if self.config.using_oauth:
session = await get_session(request)
if not session.get("user"):
session["last_visit"] = str(request.url)
raise aiohttp.web.HTTPFound("/login")

user = session.get("user")
logger.info(user)

whitelist = self.bot.config.get("oauth_whitelist", [])

roles = await get_user_roles(user["id"])

if (
int(user["id"]) not in whitelist and
"everyone" not in whitelist and
any(int(r) not in whitelist for r in roles)
):
logger.warn("Unauthorized access detected")
return await self.render_template("unauthorized")

return wrapper

return decorator

async def get_user_info(token):
headers = {"Authorization": f"Bearer {token}"}
async with client_session.get(
f"{API_BASE}/users/@me", headers=headers
) as resp:
_r = await resp.json()
logger.info(f"UINFO: {_r}")
return await resp.json()

async def get_user_roles(user_id):
_guild_id = os.getenv("GUILD_ID", None)
_bot_token = os.getenv("BOT_TOKEN", None)
url = ROLE_URL.format(guild_id=_guild_id, user_id=user_id)
headers = {"Authorization": f"Bot {_bot_token}"}
async with client_session.get(url, headers=headers) as resp:
user = await resp.json()
return user.get("roles", [])

async def fetch_token(code):
data = {
"code": code,
"grant_type": "authorization_code",
"redirect_uri": OAUTH2_REDIRECT_URI,
"client_id": OAUTH2_CLIENT_ID,
"client_secret": OAUTH2_CLIENT_SECRET,
"scope": "identify",
}

async with client_session.post(TOKEN_URL, data=data) as resp:
json = await resp.json()
return json

async def login(request):
session = await get_session(request)
if not session.get("last_visit"):
session["last_visit"] = "/"

data = {
"scope": "identify",
"client_id": OAUTH2_CLIENT_ID,
"response_type": "code",
"redirect_uri": OAUTH2_REDIRECT_URI,
}

raise aiohttp.web.HTTPFound(f"{AUTHORIZATION_BASE_URL}?{urlencode(data)}")

async def oauth_callback(request):
session = await get_session(request)

code = request.query.get("code")
token = await fetch_token(code)
access_token = token.get("access_token")
if access_token is not None:
session["access_token"] = access_token
session["user"] = await get_user_info(access_token)
url = "/"
if "last_visit" in session:
url = session["last_visit"]
raise aiohttp.web.HTTPFound(url)
raise aiohttp.web.HTTPFound("/login")

async def logout(request):
session = await get_session(request)
session.invalidate()
raise aiohttp.web.HTTPFound("/")
151 changes: 151 additions & 0 deletions plugins/@local/logviewer/core/formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import html
import re
import base64


def format_content_html(content: str, allow_links: bool = False) -> str:
# HTML-encode content

def encode_codeblock(m):
encoded = base64.b64encode(m.group(1).encode()).decode()
return "\x1AM" + encoded + "\x1AM"

# Encode multiline codeblocks (```text```)
content = re.sub(r"```+((?:[^`]*?\n)?(?:[^`]+))\n?```+", encode_codeblock, content)

content = html.escape(content)

def encode_inline_codeblock(m):
encoded = base64.b64encode(m.group(1).encode()).decode()
return "\x1AI" + encoded + "\x1AI"

# Encode inline codeblocks (`text`)
content = re.sub(r"`([^`]+)`", encode_inline_codeblock, content)

# Encode inline blockquotes (> test)
# Multiline blockquotes (>>> test) are saved as single in Mongo (> test)
content = re.sub(r"(&gt; )([^\n]+)", r"<blockquote>\2</blockquote>", content)

# Encode links
if allow_links:

def encode_link(m):
encoded_1 = base64.b64encode(m.group(1).encode()).decode()
encoded_2 = base64.b64encode(m.group(2).encode()).decode()
return "\x1AL" + encoded_1 + "|" + encoded_2 + "\x1AL"

content = re.sub(r"\[(.*?)\]\((.*?)\)", encode_link, content)

def encode_url(m):
encoded = base64.b64encode(m.group(1).encode()).decode()
return "\x1AU" + encoded + "\x1AU"

# Encode URLs
content = re.sub(
r"(\b(?:(?:https?|ftp|file)://|www\.|ftp\.)(?:\([-a-zA-Z0"
r"-9+&@#/%?=~_|!:,\.\[\];]*\)|[-a-zA-Z0-9+&@#/%?=~_|!:,\."
r"\[\];])*(?:\([-a-zA-Z0-9+&@#/%?=~_|!:,\.\[\];]*\)|[-a-z"
r"A-Z0-9+&@#/%=~_|$]))",
encode_url,
content,
)

# Process bold (**text**)
content = re.sub(r"(\*\*)(?=\S)(.+?[*_]*)(?<=\S)\1", r"<b>\2</b>", content)

# Process underline (__text__)
content = re.sub(r"(__)(?=\S)(.+?)(?<=\S)\1", r"<u>\2</u>", content)

# Process italic (*text* or _text_)
content = re.sub(r"(\*|_)(?=\S)(.+?)(?<=\S)\1", r"<i>\2</i>", content)

# Process strike through (~~text~~)
content = re.sub(r"(~~)(?=\S)(.+?)(?<=\S)\1", r"<s>\2</s>", content)

def decode_inline_codeblock(m):
decoded = base64.b64decode(m.group(1).encode()).decode()
return '<span class="pre pre--inline">' + decoded + "</span>"

# Decode and process inline codeblocks
content = re.sub("\x1AI(.*?)\x1AI", decode_inline_codeblock, content)

# Decode and process links
if allow_links:

def decode_link(m):
encoded_1 = base64.b64decode(m.group(1).encode()).decode()
encoded_2 = base64.b64decode(m.group(2).encode()).decode()
return '<a href="' + encoded_2 + '">' + encoded_1 + "</a>"

# Potential bug, may need to change to: '\x1AL(.*?)\|(.*?)\x1AL'
content = re.sub("\x1AL(.*?)\\|(.*?)\x1AL", decode_link, content)

def decode_url(m):
decoded = base64.b64decode(m.group(1).encode()).decode()
return '<a href="' + decoded + '">' + decoded + "</a>"

# Decode and process URLs
content = re.sub("\x1AU(.*?)\x1AU", decode_url, content)

# Process new lines
content = content.replace("\n", "<br>")

def decode_codeblock(m):
decoded = base64.b64decode(m.group(1).encode()).decode()
match = re.match("^([^`]*?\n)?([^`]+)$", decoded)
lang = match.group(1) or ""
if not lang.strip(" \n\r"):
lang = "plaintext"
else:
lang = lang.strip(" \n\r")

result = html.escape(match.group(2))
return f'<div class="pre pre--multiline {lang}">{result}' "</div>"

# Decode and process multiline codeblocks
content = re.sub("\x1AM(.*?)\x1AM", decode_codeblock, content)

# Meta mentions (@everyone)
content = content.replace("@everyone", '<span class="mention">@everyone</span>')

# Meta mentions (@here)
content = content.replace("@here", '<span class="mention">@here</span>')

# User mentions (<@id> and <@!id>)
content = re.sub(
r"(&lt;@!?(\d+)&gt;)", r'<span class="mention" title="\2">\1</span>', content
)

# Channel mentions (<#id>)
content = re.sub(r"(&lt;#\d+&gt;)", r'<span class="mention">\1</span>', content)

# Role mentions (<@&id>)
content = re.sub(
r"(&lt;@&amp;(\d+)&gt;)", r'<span class="mention">\1</span>', content
)

# Custom emojis (<:name:id>)
is_jumboable = not re.sub(r"&lt;(:.*?:)(\d*)&gt;", "", content)
emoji_class = "emoji emoji--large" if is_jumboable else "emoji"
content = re.sub(
r"&lt;(:.*?:)(\d*)&gt;",
r'<img class="'
+ emoji_class
+ r'" title="\1" src="https://cdn.discordapp.com/'
+ r'emojis/\2.png" alt="\1">',
content,
)

# Custom animated emojis (<a:name:id>)
is_jumboable_animated = not re.sub(r"&lt;(a:.*?:)(\d*)&gt;", "", content)
emoji_class_animated = "emoji emoji--large" if is_jumboable_animated else "emoji"
content = re.sub(
r"&lt;(a:.*?:)(\d*)&gt;",
r'<img class="'
+ emoji_class_animated
+ r'" title="\1" src="https://cdn.discordapp.com/'
+ r'emojis/\2.gif" alt="\1">',
content,
)

return content
Loading

0 comments on commit a637ef4

Please sign in to comment.