-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttpx_list_concurrent.py
76 lines (67 loc) · 1.61 KB
/
httpx_list_concurrent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""
proof of concept, get all bios in parallel using asyncio
"""
import asyncio
import time
import httpx
BASE_URL = "https://hn.algolia.com/api/v1/users"
async def get_bio(username: str, client: httpx.AsyncClient) -> str:
"""
get_bio async wrapper allowing bios to be fetched in parallel.
"""
response = await client.get(f"{BASE_URL}/{username}")
print(".")
data = response.json()
return data["about"]
async def main() -> None:
"""
main async function
"""
t_0 = time.time()
usernames = [
"author",
"abtinf",
"TheCoelacanth",
"tomcam",
"chauhankiran",
"ulizzle",
"ulizzle",
"ulizzle",
"cratermoon",
"Aeolun",
"ulizzle",
"firexcy",
"kazinator",
"blacksoil",
"lucakiebel",
"ozim",
"tomcam",
"jstummbillig",
"tomcam",
"johnchristopher",
"Tade0",
"lallysingh",
"paulddraper",
"WilTimSon",
"gumby",
"kristopolous",
"zemo",
"aschearer",
"why-el",
"Osiris",
"mdaniel",
"ianbutler",
"vinaypai",
"samtho",
"chazeon",
"taeric",
"yellowapple",
"Kye",
]
headers = {"User-Agent": "curl/7.72.0"}
async with httpx.AsyncClient(headers=headers, timeout=None) as client:
tasks = [get_bio(user, client) for user in usernames]
bios = await asyncio.gather(*tasks)
print(dict(zip(usernames, bios)))
print(f"Total time: {time.time() - t_0:.3} seconds")
asyncio.run(main())