-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgithub.py
114 lines (101 loc) · 3.59 KB
/
github.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- encoding: utf-8 -*-
from requests import Response
from distutils.util import strtobool
from http import HTTPStatus
try:
from .http import http_request
except ImportError:
from odoo.addons.oneshare_utils.http import http_request
import os
ENV_GITHUB_ACCESS_TOKEN = os.getenv("ENV_GITHUB_ACCESS_TOKEN", "")
ENV_GITHUB_OWNER = os.getenv("ENV_GITHUB_OWNER", "")
class GithubProvider(object):
def __init__(self, token=ENV_GITHUB_ACCESS_TOKEN):
self._access_token = token
self._repo_event_url_tmpl = f"https://api.github.com/repos/%s/%s/dispatches"
self._repo_get_tags_url_tmpl = f"https://api.github.com/repos/%s/%s/tags"
self._get_repos = f"https://api.github.com/user/repos"
@staticmethod
def open():
return True
def invoke_api(self, evt: str = "", *args, **kwargs):
if evt == "repo_dispatch":
repo = kwargs.get("repo", "")
owner = kwargs.get("owner", "")
client_payload = kwargs.get("client_payload", "")
return self.trigger_repo_dispatch_evt(owner, repo, client_payload)
if evt == "list_repos":
return self.get_repos()
if evt == "list_tags":
repo = kwargs.get("repo", "")
owner = kwargs.get("owner", "")
return self.get_repo_tags(owner, repo)
def get_repo_tags(self, owner, repo):
if not repo or not owner:
return
url = self._repo_get_tags_url_tmpl % (
owner,
repo,
)
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {self._access_token}",
}
resp = self._do_get_repo_tags(url=url, headers=headers)
if not isinstance(resp, Response):
return None
if resp.status_code != HTTPStatus.OK:
return None
data = resp.json()
tags = []
for tag in data:
tags.append(tag.get("name"))
return tags
def get_repos(self):
url = self._get_repos
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {self._access_token}",
}
params = {"sort": "updated"}
resp = self._do_get_all_repos(url=url, headers=headers, params=params)
if not isinstance(resp, Response):
return None
if resp.status_code != HTTPStatus.OK:
return None
data = resp.json()
repos = []
for repo in data:
repos.append(repo.get("full_name"))
return repos
@http_request(method="get")
def _do_get_repo_tags(self, *args, **kwargs):
return
@http_request(method="get")
def _do_get_all_repos(self, *args, **kwargs):
data = {}
return data
def trigger_repo_dispatch_evt(self, owner="", repo="", client_payload: dict = None):
if client_payload is None:
client_payload = {}
if not repo or not owner:
return
url = self._repo_event_url_tmpl % (
owner,
repo,
)
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {self._access_token}",
}
resp = self._do_trigger_repo_dispatch_evt(
url=url, headers=headers, client_payload=client_payload
)
return resp
@http_request()
def _do_trigger_repo_dispatch_evt(self, *args, **kwargs):
data = {
"event_type": kwargs.get("event_type", "repo_dispatch"),
"client_payload": kwargs.get("client_payload", {}),
}
return data