source: trunk/src/allmydata/util/observer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 4.9 KB
Line 
1"""
2Observer for Twisted code.
3
4Ported to Python 3.
5"""
6
7import weakref
8from twisted.internet import defer
9from foolscap.api import eventually
10from twisted.logger import (
11    Logger,
12)
13
14"""The idiom we use is for the observed object to offer a method named
15'when_something', which returns a deferred.  That deferred will be fired when
16something happens.  The way this is typically implemented is that the observed
17has an ObserverList whose when_fired method is called in the observed's
18'when_something'."""
19
20class OneShotObserverList(object):
21    """A one-shot event distributor."""
22    def __init__(self):
23        self._fired = False
24        self._result = None
25        self._watchers = []
26        self.__repr__ = self._unfired_repr
27
28    def __repr__(self):
29        """string representation of the OneshotObserverList"""
30        if self._fired:
31            return self._fired_repr()
32        return self._unfired_repr()
33
34    def _unfired_repr(self):
35        return "<OneShotObserverList [%s]>" % (self._watchers, )
36
37    def _fired_repr(self):
38        return "<OneShotObserverList -> %s>" % (self._result, )
39
40    def _get_result(self):
41        return self._result
42
43    def when_fired(self):
44        if self._fired:
45            return defer.succeed(self._get_result())
46        d = defer.Deferred()
47        self._watchers.append(d)
48        return d
49
50    def fire(self, result):
51        assert not self._fired
52        self._fired = True
53        self._result = result
54        self._fire(result)
55
56    def _fire(self, result):
57        for w in self._watchers:
58            w.callback(result)
59        del self._watchers
60        self.__repr__ = self._fired_repr
61
62    def fire_if_not_fired(self, result):
63        if not self._fired:
64            self.fire(result)
65
66class LazyOneShotObserverList(OneShotObserverList):
67    """
68    a variant of OneShotObserverList which does not retain
69    the result it handles, but rather retains a callable()
70    through which is retrieves the data if and when needed.
71    """
72    def __init__(self):
73        OneShotObserverList.__init__(self)
74
75    def _get_result(self):
76        return self._result_producer()
77
78    def fire(self, result_producer):
79        """
80        @param result_producer: a no-arg callable which
81        returns the data which is to be considered the
82        'result' for this observer list.  note that this
83        function may be called multiple times - once
84        upon initial firing, and potentially once more
85        for each subsequent when_fired() deferred created
86        """
87        assert not self._fired
88        self._fired = True
89        self._result_producer = result_producer
90        if self._watchers: # if not, don't call result_producer
91            self._fire(self._get_result())
92
93class ObserverList(object):
94    """
95    Immediately distribute events to a number of subscribers.
96    """
97    _logger = Logger()
98
99    def __init__(self):
100        self._watchers = []
101
102    def subscribe(self, observer):
103        self._watchers.append(observer)
104
105    def unsubscribe(self, observer):
106        self._watchers.remove(observer)
107
108    def notify(self, *args, **kwargs):
109        for o in self._watchers[:]:
110            try:
111                o(*args, **kwargs)
112            except Exception:
113                self._logger.failure("While notifying {o!r}", o=o)
114
115class EventStreamObserver(object):
116    """A simple class to distribute multiple events to a single subscriber.
117    It accepts arbitrary kwargs, but no posargs."""
118    def __init__(self):
119        self._watcher = None
120        self._undelivered_results = []
121        self._canceler = None
122
123    def set_canceler(self, c, methname):
124        """I will call c.METHNAME(self) when somebody cancels me."""
125        # we use a weakref to avoid creating a cycle between us and the thing
126        # we're observing: they'll be holding a reference to us to compare
127        # against the value we pass to their canceler function. However,
128        # since bound methods are first-class objects (and not kept alive by
129        # the object they're bound to), we can't just stash a weakref to the
130        # bound cancel method. Instead, we must hold a weakref to the actual
131        # object, and obtain its cancel method later.
132        # http://code.activestate.com/recipes/81253-weakmethod/ has an
133        # alternative.
134        self._canceler = (weakref.ref(c), methname)
135
136    def subscribe(self, observer, **watcher_kwargs):
137        self._watcher = (observer, watcher_kwargs)
138        while self._undelivered_results:
139            self._notify(self._undelivered_results.pop(0))
140
141    def notify(self, **result_kwargs):
142        if self._watcher:
143            self._notify(result_kwargs)
144        else:
145            self._undelivered_results.append(result_kwargs)
146
147    def _notify(self, result_kwargs):
148        o, watcher_kwargs = self._watcher
149        kwargs = dict(result_kwargs)
150        kwargs.update(watcher_kwargs)
151        eventually(o, **kwargs)
152
153    def cancel(self):
154        wr,methname = self._canceler
155        o = wr()
156        if o:
157            getattr(o,methname)(self)
Note: See TracBrowser for help on using the repository browser.