source: trunk/src/allmydata/test/test_observer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 4.1 KB
Line 
1"""
2Tests for allmydata.util.observer.
3
4Ported to Python 3.
5"""
6
7from twisted.trial import unittest
8from twisted.internet import defer, reactor
9from allmydata.util import observer
10
11def nextTurn(res=None):
12    d = defer.Deferred()
13    reactor.callLater(1, d.callback, res)
14    return d
15
16class Observer(unittest.TestCase):
17    def test_oneshot(self):
18        ol = observer.OneShotObserverList()
19        rep = repr(ol)
20        self.failUnlessEqual(rep, "<OneShotObserverList [[]]>")
21        d1 = ol.when_fired()
22        d2 = ol.when_fired()
23        def _addmore(res):
24            self.failUnlessEqual(res, "result")
25            d3 = ol.when_fired()
26            d3.addCallback(self.failUnlessEqual, "result")
27            return d3
28        d1.addCallback(_addmore)
29        ol.fire("result")
30        rep = repr(ol)
31        self.failUnlessEqual(rep, "<OneShotObserverList -> result>")
32        d4 = ol.when_fired()
33        dl = defer.DeferredList([d1,d2,d4])
34        return dl
35
36    def test_oneshot_fireagain(self):
37        ol = observer.OneShotObserverList()
38        d = ol.when_fired()
39        def _addmore(res):
40            self.failUnlessEqual(res, "result")
41            ol.fire_if_not_fired("result3") # should be ignored
42            d2 = ol.when_fired()
43            d2.addCallback(self.failUnlessEqual, "result")
44            return d2
45        d.addCallback(_addmore)
46        ol.fire_if_not_fired("result")
47        ol.fire_if_not_fired("result2")
48        return d
49
50    def test_lazy_oneshot(self):
51        ol = observer.LazyOneShotObserverList()
52        d1 = ol.when_fired()
53        d2 = ol.when_fired()
54        def _addmore(res):
55            self.failUnlessEqual(res, "result")
56            d3 = ol.when_fired()
57            d3.addCallback(self.failUnlessEqual, "result")
58            return d3
59        d1.addCallback(_addmore)
60        def _get_result():
61            return "result"
62        ol.fire(_get_result)
63        d4 = ol.when_fired()
64        dl = defer.DeferredList([d1,d2,d4])
65        return dl
66
67    def test_observerlist(self):
68        ol = observer.ObserverList()
69        l1 = []
70        l2 = []
71        l3 = []
72        ol.subscribe(l1.append)
73        ol.notify(1)
74        ol.subscribe(l2.append)
75        ol.notify(2)
76        ol.unsubscribe(l1.append)
77        ol.notify(3)
78        def _check(res):
79            self.failUnlessEqual(l1, [1,2])
80            self.failUnlessEqual(l2, [2,3])
81        d = nextTurn()
82        d.addCallback(_check)
83        def _step2(res):
84            def _add(a, b, c=None):
85                l3.append((a,b,c))
86            ol.unsubscribe(l2.append)
87            ol.subscribe(_add)
88            ol.notify(4, 5, c=6)
89            return nextTurn()
90        def _check2(res):
91            self.failUnlessEqual(l3, [(4,5,6)])
92        d.addCallback(_step2)
93        d.addCallback(_check2)
94        return d
95
96    def test_observer_list_reentrant(self):
97        """
98        ``ObserverList`` is reentrant.
99        """
100        observed = []
101
102        def observer_one():
103            obs.unsubscribe(observer_one)
104
105        def observer_two():
106            observed.append(None)
107
108        obs = observer.ObserverList()
109        obs.subscribe(observer_one)
110        obs.subscribe(observer_two)
111        obs.notify()
112
113        self.assertEqual([None], observed)
114
115    def test_observer_list_observer_errors(self):
116        """
117        An error in an earlier observer does not prevent notification from being
118        delivered to a later observer.
119        """
120        observed = []
121
122        def observer_one():
123            raise Exception("Some problem here")
124
125        def observer_two():
126            observed.append(None)
127
128        obs = observer.ObserverList()
129        obs.subscribe(observer_one)
130        obs.subscribe(observer_two)
131        obs.notify()
132
133        self.assertEqual([None], observed)
134        self.assertEqual(1, len(self.flushLoggedErrors(Exception)))
135
136    def test_observer_list_propagate_keyboardinterrupt(self):
137        """
138        ``KeyboardInterrupt`` escapes ``ObserverList.notify``.
139        """
140        def observer_one():
141            raise KeyboardInterrupt()
142
143        obs = observer.ObserverList()
144        obs.subscribe(observer_one)
145
146        with self.assertRaises(KeyboardInterrupt):
147            obs.notify()
Note: See TracBrowser for help on using the repository browser.