-
Notifications
You must be signed in to change notification settings - Fork 9
/
WeiboClassV2.py
136 lines (124 loc) · 4.56 KB
/
WeiboClassV2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
# @Time : 2023/3/26 19:42
# @Author : Euclid-Jie
# @File : WeiboClassV2.py
import time
from urllib.parse import urlencode
from Euclidweibo import *
class WeiboClassV2:
def __init__(
self, keyWord=None, method=None, baseUrl=None, Mongo=True, proxies=False
):
self.keyWord = keyWord
self.UrlList = None
self.baseUrl = "https://s.weibo.com/weibo?"
self.method = method
self.Mongo = Mongo
self.proxies = proxies
def UrlFormat(self, keyWords, timeBegin: str, timeEnd: str, page: int):
"""
:param keyWords: 关键词
:param timeBegin: 开始时间
:param timeEnd: 结束时间
:param page: 页数
:return: targetUrl
"""
query = {
"q": keyWords,
"typeall": 1,
"suball": 1,
"timescope": "custom:{}:{}".format(timeBegin, timeEnd),
"Refer": "g",
"page": page,
}
targetUrl = self.baseUrl + urlencode(query)
return targetUrl
def get_url_list(self, beginTime: str, endTime: str, proxies):
page = 0
NetPage = True
self.UrlList = []
print("\n\t >>> get blog url begin ...")
while NetPage:
page += 1
targetUrl = self.UrlFormat(self.keyWord, beginTime, endTime, page)
onePageList = Get_item_url_list(targetUrl, proxies)
tmpLen = len(self.UrlList)
self.UrlList.extend(onePageList)
self.UrlList = list(set(self.UrlList))
if len(self.UrlList) == tmpLen:
NetPage = False
print("\r\t\t page: {}, len: {}".format(page, len(self.UrlList)), end="")
@staticmethod
def select_field(data):
if "page_info" in data.keys():
video_url = data["page_info"]["media_info"]["mp4_720p_mp4"]
else:
video_url = ""
selectedData = {
# base filed
"time": data["created_at"],
"mid": data["mid"],
"nick_name": data["user"]["screen_name"],
"useId": data["user"]["id"],
"mblogUrl": "https://weibo.com/{}/{}".format(
data["user"]["id"], data["mid"]
),
# addition field
"attitudes_count": data["attitudes_count"],
"comments_count": data["comments_count"],
"reposts_count": data["reposts_count"],
"text": data["text"],
"text_raw": data["text_raw"],
"longTextContent": data["longTextContent"],
"video_url": video_url,
}
return selectedData
def main(self, beginTime, endTime, ColName=None):
if ColName is None:
ColName = self.keyWord
if self.Mongo:
_COL = MongoClient("Weibo", ColName)
else:
_COL = CsvClient("Weibo", ColName)
print(">> get blog info begin ...")
NewEndTime = endTime
while NewEndTime > beginTime:
print("\t time span: {} - {}".format(beginTime, NewEndTime), end="")
self.get_url_list(beginTime, NewEndTime, self.proxies)
if len(self.UrlList) <= 5:
break
print("\t >>> write blog url begin ...")
BreakOrNot = True
for mblogid in tqdm(self.UrlList):
try:
data_json = Get_single_weibo_data(
mblogid.split("/")[-1], proxies=self.proxies
)
if data_json:
data_json = Get_longTextContent(data_json)
selectedData = self.select_field(data_json)
_COL.insert_one(selectedData)
# upDate date
tmpTime = pd.to_datetime(selectedData["time"]).strftime(
"%Y-%m-%d-%H"
)
else:
tmpTime = None
except json.decoder.JSONDecodeError:
tmpTime = None
except KeyError:
tmpTime = None
if tmpTime:
if NewEndTime >= tmpTime:
NewEndTime = tmpTime
BreakOrNot = False
if BreakOrNot:
break
print("\t >>> write blog url done")
print(">>> get blog info done")
if __name__ == "__main__":
# a = time.time()
WeiboClassV2("流浪地球", Mongo=False, proxies=False).main(
"2023-01-28-00", "2023-02-10-00"
)
# print(time.time() - a)