-
-
Notifications
You must be signed in to change notification settings - Fork 127
/
Copy pathobserver_concept.py
71 lines (48 loc) · 1.51 KB
/
observer_concept.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# pylint: disable=too-few-public-methods
# pylint: disable=arguments-differ
"Observer Design Pattern Concept"
from abc import ABCMeta, abstractmethod
class IObservable(metaclass=ABCMeta):
"The Subject Interface"
@staticmethod
@abstractmethod
def subscribe(observer):
"The subscribe method"
@staticmethod
@abstractmethod
def unsubscribe(observer):
"The unsubscribe method"
@staticmethod
@abstractmethod
def notify(observer):
"The notify method"
class Subject(IObservable):
"The Subject (a.k.a Observable)"
def __init__(self):
self._observers = set()
def subscribe(self, observer):
self._observers.add(observer)
def unsubscribe(self, observer):
self._observers.remove(observer)
def notify(self, *args):
for observer in self._observers:
observer.notify(*args)
class IObserver(metaclass=ABCMeta):
"A method for the Observer to implement"
@staticmethod
@abstractmethod
def notify(observable, *args):
"Receive notifications"
class Observer(IObserver):
"The concrete observer"
def __init__(self, observable):
observable.subscribe(self)
def notify(self, *args):
print(f"Observer id:{id(self)} received {args}")
# The Client
SUBJECT = Subject()
OBSERVER_A = Observer(SUBJECT)
OBSERVER_B = Observer(SUBJECT)
SUBJECT.notify("First Notification", [1, 2, 3])
SUBJECT.unsubscribe(OBSERVER_B)
SUBJECT.notify("Second Notification", {"A": 1, "B": 2, "C": 3})