-
Notifications
You must be signed in to change notification settings - Fork 2
/
scrape_utils.py
145 lines (117 loc) · 4.36 KB
/
scrape_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import requests
from bs4 import BeautifulSoup
import re
import csv
from urllib.parse import urljoin, urlparse
import phonenumbers
import datetime
import time
import random
def find_emails(text):
return re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,4}", text)
def find_phone_numbers(text, default_region="US"):
parsed_numbers = []
for match in phonenumbers.PhoneNumberMatcher(text, default_region):
parsed_numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.E164
)
)
return parsed_numbers
def find_names(text):
# Modify this regex pattern to match your required name format
return re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", text)
def save_to_csv(data, file_name_prefix="scraped_data"):
current_date = datetime.date.today().strftime("%Y-%m-%d")
file_name = f"{file_name_prefix}_{current_date}.csv"
with open(file_name, "w", newline="", encoding="utf-8") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(["URL", "Emails"])
for row in data:
csv_writer.writerow(row)
def find_links(soup, base_url):
links = set()
for a in soup.find_all("a", href=True):
href = a["href"]
if not href.startswith("#"):
url = urljoin(base_url, href)
parsed_url = urlparse(url)
if parsed_url.scheme in ["http", "https"]:
links.add(url)
return links
def find_links_v2(soup, base_url):
links = set()
base_netloc = urlparse(base_url).netloc
for a in soup.find_all("a", href=True):
href = a["href"]
if not href.startswith("#"):
url = urljoin(base_url, href)
parsed_url = urlparse(url)
if (
parsed_url.scheme in ["http", "https"]
and parsed_url.netloc == base_netloc
):
links.add(url)
return links
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
DELAY = 5 # Adjust the delay in seconds as needed
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1.1 Safari/605.1.15",
# Add more user agents as needed
]
def random_user_agent():
return {"User-Agent": random.choice(USER_AGENTS)}
def scrape_website(url, visited_urls, depth=0, max_depth=2):
if depth > max_depth or url in visited_urls:
return [], []
visited_urls.add(url)
print(f"running scrape_website on {url} (depth: {depth})")
try:
response = requests.get(url, headers=random_user_agent())
time.sleep(DELAY)
except requests.exceptions.RequestException as e:
print(f"Error while requesting {url}: {e}")
return [], []
soup = BeautifulSoup(response.text, "html.parser")
text = soup.get_text()
# print("text: ", text)
emails = find_emails(text)
phone_numbers = find_phone_numbers(text)
# names = find_names(text)
if depth < max_depth:
links = find_links_v2(soup, url)
for link in links:
sub_emails, sub_phone_numbers = scrape_website(
link, visited_urls, depth + 1, max_depth
)
emails.extend(sub_emails)
phone_numbers.extend(sub_phone_numbers)
# names.extend(sub_names)
return (
emails,
phone_numbers,
) # names
def scrape_websites_deep_search(urls, max_depth=2):
scraped_data = []
for url in urls:
visited_urls = set()
emails, phone_numbers = scrape_website(url, visited_urls, 0, max_depth)
scraped_data.append(
[
url,
", ".join(emails),
", ".join(phone_numbers),
]
)
save_to_csv(scraped_data, "scraped_data")
return scraped_data