-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathabstract.py
477 lines (426 loc) · 18.5 KB
/
abstract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# An interface for all clients to implement.
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import (
Any,
AsyncIterable,
Dict,
Iterable,
List,
Mapping,
Optional,
Tuple,
Type,
Union,
)
from aleph_message.models import (
AlephMessage,
MessagesResponse,
MessageType,
Payment,
PostMessage,
)
from aleph_message.models.execution.program import Encoding
from aleph_message.status import MessageStatus
from ..query.filters import MessageFilter, PostFilter
from ..query.responses import PostsResponse
from ..types import GenericMessage, StorageEnum
from ..utils import Writable
DEFAULT_PAGE_SIZE = 200
class AlephClient(ABC):
@abstractmethod
async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]:
"""
Fetch a value from the aggregate store by owner address and item key.
:param address: Address of the owner of the aggregate
:param key: Key of the aggregate
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
@abstractmethod
async def fetch_aggregates(
self, address: str, keys: Optional[Iterable[str]] = None
) -> Dict[str, Dict]:
"""
Fetch key-value pairs from the aggregate store by owner address.
:param address: Address of the owner of the aggregate
:param keys: Keys of the aggregates to fetch (Default: all items)
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
@abstractmethod
async def get_posts(
self,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
post_filter: Optional[PostFilter] = None,
ignore_invalid_messages: Optional[bool] = True,
invalid_messages_log_level: Optional[int] = logging.NOTSET,
) -> PostsResponse:
"""
Fetch a list of posts from the network.
:param page_size: Number of items to fetch (Default: 200)
:param page: Page to fetch, begins at 1 (Default: 1)
:param post_filter: Filter to apply to the posts (Default: None)
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
async def get_posts_iterator(
self,
post_filter: Optional[PostFilter] = None,
) -> AsyncIterable[PostMessage]:
"""
Fetch all filtered posts, returning an async iterator and fetching them page by page. Might return duplicates
but will always return all posts.
:param post_filter: Filter to apply to the posts (Default: None)
"""
page = 1
resp = None
while resp is None or len(resp.posts) > 0:
resp = await self.get_posts(
page=page,
post_filter=post_filter,
)
page += 1
for post in resp.posts:
yield post
@abstractmethod
async def download_file(
self,
file_hash: str,
) -> bytes:
"""
Get a file from the storage engine as raw bytes.
Warning: Downloading large files can be slow and memory intensive.
:param file_hash: The hash of the file to retrieve.
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
async def download_file_ipfs(
self,
file_hash: str,
) -> bytes:
"""
Get a file from the ipfs storage engine as raw bytes.
Warning: Downloading large files can be slow.
:param file_hash: The hash of the file to retrieve.
"""
raise NotImplementedError()
async def download_file_ipfs_to_buffer(
self,
file_hash: str,
output_buffer: Writable[bytes],
) -> None:
"""
Download a file from the storage engine and write it to the specified output buffer.
:param file_hash: The hash of the file to retrieve.
:param output_buffer: The binary output buffer to write the file data to.
"""
raise NotImplementedError()
async def download_file_to_buffer(
self,
file_hash: str,
output_buffer: Writable[bytes],
) -> None:
"""
Download a file from the storage engine and write it to the specified output buffer.
:param file_hash: The hash of the file to retrieve.
:param output_buffer: Writable binary buffer. The file will be written to this buffer.
"""
raise NotImplementedError()
@abstractmethod
async def get_messages(
self,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
message_filter: Optional[MessageFilter] = None,
ignore_invalid_messages: Optional[bool] = True,
invalid_messages_log_level: Optional[int] = logging.NOTSET,
) -> MessagesResponse:
"""
Fetch a list of messages from the network.
:param page_size: Number of items to fetch (Default: 200)
:param page: Page to fetch, begins at 1 (Default: 1)
:param message_filter: Filter to apply to the messages
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
async def get_messages_iterator(
self,
message_filter: Optional[MessageFilter] = None,
) -> AsyncIterable[AlephMessage]:
"""
Fetch all filtered messages, returning an async iterator and fetching them page by page. Might return duplicates
but will always return all messages.
:param message_filter: Filter to apply to the messages
"""
page = 1
resp = None
while resp is None or len(resp.messages) > 0:
resp = await self.get_messages(
page=page,
message_filter=message_filter,
)
page += 1
for message in resp.messages:
yield message
@abstractmethod
async def get_message(
self,
item_hash: str,
message_type: Optional[Type[GenericMessage]] = None,
) -> GenericMessage:
"""
Get a single message from its `item_hash` and perform some basic validation.
:param item_hash: Hash of the message to fetch
:param message_type: Type of message to fetch
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
@abstractmethod
def watch_messages(
self,
message_filter: Optional[MessageFilter] = None,
) -> AsyncIterable[AlephMessage]:
"""
Iterate over current and future matching messages asynchronously.
:param message_filter: Filter to apply to the messages
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")
class AuthenticatedAlephClient(AlephClient):
@abstractmethod
async def create_post(
self,
post_content: Any,
post_type: str,
ref: Optional[str] = None,
address: Optional[str] = None,
channel: Optional[str] = None,
inline: bool = True,
storage_engine: StorageEnum = StorageEnum.storage,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Create a POST message on the aleph.im network. It is associated with a channel and owned by an account.
:param post_content: The content of the message
:param post_type: An arbitrary content type that helps to describe the post_content
:param ref: A reference to a previous message that it replaces
:param address: The address that will be displayed as the author of the message
:param channel: The channel that the message will be posted on
:param inline: An optional flag to indicate if the content should be inlined in the message or not
:param storage_engine: An optional storage engine to use for the message, if not inlined (Default: "storage")
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
@abstractmethod
async def create_aggregate(
self,
key: str,
content: Mapping[str, Any],
address: Optional[str] = None,
channel: Optional[str] = None,
inline: bool = True,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Create an AGGREGATE message. It is meant to be used as a quick access storage associated with an account.
:param key: Key to use to store the content
:param content: Content to store
:param address: Address to use to sign the message
:param channel: Channel to use (Default: "TEST")
:param inline: Whether to write content inside the message (Default: True)
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
@abstractmethod
async def create_store(
self,
address: Optional[str] = None,
file_content: Optional[bytes] = None,
file_path: Optional[Union[str, Path]] = None,
file_hash: Optional[str] = None,
guess_mime_type: bool = False,
ref: Optional[str] = None,
storage_engine: StorageEnum = StorageEnum.storage,
extra_fields: Optional[dict] = None,
channel: Optional[str] = None,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Create a STORE message to store a file on the aleph.im network.
Can be passed either a file path, an IPFS hash or the file's content as raw bytes.
:param address: Address to display as the author of the message (Default: account.get_address())
:param file_content: Byte stream of the file to store (Default: None)
:param file_path: Path to the file to store (Default: None)
:param file_hash: Hash of the file to store (Default: None)
:param guess_mime_type: Guess the MIME type of the file (Default: False)
:param ref: Reference to a previous message (Default: None)
:param storage_engine: Storage engine to use (Default: "storage")
:param extra_fields: Extra fields to add to the STORE message (Default: None)
:param channel: Channel to post the message to (Default: "TEST")
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
@abstractmethod
async def create_program(
self,
program_ref: str,
entrypoint: str,
runtime: str,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
persistent: bool = False,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
subscriptions: Optional[List[Mapping]] = None,
metadata: Optional[Mapping[str, Any]] = None,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Post a (create) PROGRAM message.
:param program_ref: Reference to the program to run
:param entrypoint: Entrypoint to run
:param runtime: Runtime to use
:param environment_variables: Environment variables to pass to the program
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server
:param memory: Memory in MB for the VM to be allocated (Default: 128)
:param vcpus: Number of vCPUs to allocate (Default: 1)
:param timeout_seconds: Timeout in seconds (Default: 30.0)
:param persistent: Whether the program should be persistent or not (Default: False)
:param allow_amend: Whether the deployed VM image may be changed (Default: False)
:param internet: Whether the VM should have internet connectivity. (Default: True)
:param aleph_api: Whether the VM needs access to Aleph messages API (Default: True)
:param encoding: Encoding to use (Default: Encoding.zip)
:param volumes: Volumes to mount
:param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver
:param metadata: Metadata to attach to the message
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
@abstractmethod
async def create_instance(
self,
rootfs: str,
rootfs_size: int,
rootfs_name: str,
payment: Optional[Payment] = None,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
volumes: Optional[List[Mapping]] = None,
volume_persistence: str = "host",
ssh_keys: Optional[List[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Post a (create) PROGRAM message.
:param rootfs: Root filesystem to use
:param rootfs_size: Size of root filesystem
:param rootfs_name: Name of root filesystem
:param payment: Payment method used to pay for the instance
:param environment_variables: Environment variables to pass to the program
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server
:param memory: Memory in MB for the VM to be allocated (Default: 128)
:param vcpus: Number of vCPUs to allocate (Default: 1)
:param timeout_seconds: Timeout in seconds (Default: 30.0)
:param allow_amend: Whether the deployed VM image may be changed (Default: False)
:param internet: Whether the VM should have internet connectivity. (Default: True)
:param aleph_api: Whether the VM needs access to Aleph messages API (Default: True)
:param encoding: Encoding to use (Default: Encoding.zip)
:param volumes: Volumes to mount
:param volume_persistence: Where volumes are persisted, can be "host" or "store", meaning distributed across Aleph.im (Default: "host")
:param ssh_keys: SSH keys to authorize access to the VM
:param metadata: Metadata to attach to the message
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
@abstractmethod
async def forget(
self,
hashes: List[str],
reason: Optional[str],
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
address: Optional[str] = None,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Post a FORGET message to remove previous messages from the network.
Targeted messages need to be signed by the same account that is attempting to forget them,
if the creating address did not delegate the access rights to the forgetting account.
:param hashes: Hashes of the messages to forget
:param reason: Reason for forgetting the messages
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
@abstractmethod
async def submit(
self,
content: Dict[str, Any],
message_type: MessageType,
channel: Optional[str] = None,
storage_engine: StorageEnum = StorageEnum.storage,
allow_inlining: bool = True,
sync: bool = False,
raise_on_rejected: bool = True,
) -> Tuple[AlephMessage, MessageStatus, Optional[Dict[str, Any]]]:
"""
Submit a message to the network. This is a generic method that can be used to submit any type of message.
Prefer using the more specific methods to submit messages.
:param content: Content of the message
:param message_type: Type of the message
:param channel: Channel to use (Default: "TEST")
:param storage_engine: Storage engine to use (Default: "storage")
:param allow_inlining: Whether to allow inlining the content of the message (Default: True)
:param sync: If true, waits for the message to be processed by the API server (Default: False)
:param raise_on_rejected: Whether to raise an exception if the message is rejected (Default: True)
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)
async def ipfs_push(self, content: Mapping) -> str:
"""
Push a file to IPFS.
:param content: Content of the file to push
"""
raise NotImplementedError()
async def storage_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the storage service.
:param content: The dict-like content to upload
"""
raise NotImplementedError()