1 | diff -rN -u old-trunk/src/allmydata/immutable/downloader/finder.py new-trunk/src/allmydata/immutable/downloader/finder.py |
---|
2 | --- old-trunk/src/allmydata/immutable/downloader/finder.py 2010-09-07 00:58:36.000000000 -0600 |
---|
3 | +++ new-trunk/src/allmydata/immutable/downloader/finder.py 2010-09-07 00:58:42.000000000 -0600 |
---|
4 | @@ -49,14 +49,17 @@ |
---|
5 | self._lp = log.msg(format="ShareFinder[si=%(si)s] starting", |
---|
6 | si=self._si_prefix, |
---|
7 | level=log.NOISY, parent=logparent, umid="2xjj2A") |
---|
8 | + log.msg("xxx %s.__init__(%s, %s, %s, %s, %s, %s)" % (self, storage_broker, verifycap, node, download_status, logparent, max_outstanding_requests)) |
---|
9 | |
---|
10 | def update_num_segments(self): |
---|
11 | + log.msg("xxx %s.update_num_segments()" % (self,)) |
---|
12 | (numsegs, authoritative) = self.node.get_num_segments() |
---|
13 | assert authoritative |
---|
14 | for cs in self._commonshares.values(): |
---|
15 | cs.set_authoritative_num_segments(numsegs) |
---|
16 | |
---|
17 | def start_finding_servers(self): |
---|
18 | + log.msg("xxx %s.start_finding_servers()" % (self,)) |
---|
19 | # don't get servers until somebody uses us: creating the |
---|
20 | # ImmutableFileNode should not cause work to happen yet. Test case is |
---|
21 | # test_dirnode, which creates us with storage_broker=None |
---|
22 | @@ -72,6 +75,7 @@ |
---|
23 | return log.msg(*args, **kwargs) |
---|
24 | |
---|
25 | def stop(self): |
---|
26 | + log.msg("xxx %s.stop()" % (self,)) |
---|
27 | self.running = False |
---|
28 | while self.overdue_timers: |
---|
29 | req,t = self.overdue_timers.popitem() |
---|
30 | @@ -79,6 +83,7 @@ |
---|
31 | |
---|
32 | # called by our parent CiphertextDownloader |
---|
33 | def hungry(self): |
---|
34 | + log.msg("xxx %s.hungry()" % (self,)) |
---|
35 | self.log(format="ShareFinder[si=%(si)s] hungry", |
---|
36 | si=self._si_prefix, level=log.NOISY, umid="NywYaQ") |
---|
37 | self.start_finding_servers() |
---|
38 | @@ -87,6 +92,7 @@ |
---|
39 | |
---|
40 | # internal methods |
---|
41 | def loop(self): |
---|
42 | + log.msg("xxx %s.loop()" % (self,)) |
---|
43 | pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) |
---|
44 | for rt in self.pending_requests]) # sort? |
---|
45 | self.log(format="ShareFinder loop: running=%(running)s" |
---|
46 | @@ -114,6 +120,7 @@ |
---|
47 | self.send_request(server) |
---|
48 | # we loop again to get parallel queries. The check above will |
---|
49 | # prevent us from looping forever. |
---|
50 | + log.msg("xxx %s.loop() => loop again to get parallel queries" % (self,)) |
---|
51 | eventually(self.loop) |
---|
52 | return |
---|
53 | |
---|
54 | @@ -130,6 +137,7 @@ |
---|
55 | self.share_consumer.no_more_shares() |
---|
56 | |
---|
57 | def send_request(self, server): |
---|
58 | + log.msg("xxx %s.send_request(%s)" % (self, server)) |
---|
59 | peerid, rref = server |
---|
60 | req = RequestToken(peerid) |
---|
61 | self.pending_requests.add(req) |
---|
62 | @@ -152,6 +160,7 @@ |
---|
63 | d.addCallback(incidentally, eventually, self.loop) |
---|
64 | |
---|
65 | def _request_retired(self, req): |
---|
66 | + log.msg("xxx %s._request_retired(%s)" % (self, req)) |
---|
67 | self.pending_requests.discard(req) |
---|
68 | self.overdue_requests.discard(req) |
---|
69 | if req in self.overdue_timers: |
---|
70 | @@ -159,6 +168,7 @@ |
---|
71 | del self.overdue_timers[req] |
---|
72 | |
---|
73 | def overdue(self, req): |
---|
74 | + log.msg("xxx %s.overdue(%s)" % (self, req)) |
---|
75 | del self.overdue_timers[req] |
---|
76 | assert req in self.pending_requests # paranoia, should never be false |
---|
77 | self.overdue_requests.add(req) |
---|
78 | @@ -166,6 +176,7 @@ |
---|
79 | |
---|
80 | def _got_response(self, buckets, server_version, peerid, req, d_ev, |
---|
81 | time_sent, lp): |
---|
82 | + log.msg("xxx %s._got_response(%s, %s, %s, %s, %s, %s, %s)" % (self, buckets, server_version, peerid, req, d_ev, time_sent, lp)) |
---|
83 | shnums = sorted([shnum for shnum in buckets]) |
---|
84 | time_received = now() |
---|
85 | d_ev.finished(shnums, time_received) |
---|
86 | @@ -187,6 +198,7 @@ |
---|
87 | self._deliver_shares(shares) |
---|
88 | |
---|
89 | def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): |
---|
90 | + log.msg("xxx %s._create_share(%s, %s, %s, %s, %s)" % (self, shnum, bucket, server_version, peerid, dyhb_rtt)) |
---|
91 | if shnum in self._commonshares: |
---|
92 | cs = self._commonshares[shnum] |
---|
93 | else: |
---|
94 | @@ -215,6 +227,7 @@ |
---|
95 | return s |
---|
96 | |
---|
97 | def _deliver_shares(self, shares): |
---|
98 | + log.msg("xxx %s._deliver_shares(%s)" % (self, shares)) |
---|
99 | # they will call hungry() again if they want more |
---|
100 | self._hungry = False |
---|
101 | shares_s = ",".join([str(sh) for sh in shares]) |
---|
102 | @@ -223,9 +236,8 @@ |
---|
103 | eventually(self.share_consumer.got_shares, shares) |
---|
104 | |
---|
105 | def _got_error(self, f, peerid, req, d_ev, lp): |
---|
106 | + log.msg("xxx %s._got_error(%s, %s, %s, %s, %s)" % (self, f, peerid, req, d_ev, lp)) |
---|
107 | d_ev.finished("error", now()) |
---|
108 | self.log(format="got error from [%(peerid)s]", |
---|
109 | peerid=idlib.shortnodeid_b2a(peerid), failure=f, |
---|
110 | level=log.UNUSUAL, parent=lp, umid="zUKdCw") |
---|
111 | - |
---|
112 | - |
---|
113 | diff -rN -u old-trunk/src/allmydata/immutable/downloader/node.py new-trunk/src/allmydata/immutable/downloader/node.py |
---|
114 | --- old-trunk/src/allmydata/immutable/downloader/node.py 2010-09-07 00:58:36.000000000 -0600 |
---|
115 | +++ new-trunk/src/allmydata/immutable/downloader/node.py 2010-09-07 00:58:42.000000000 -0600 |
---|
116 | @@ -221,10 +221,12 @@ |
---|
117 | |
---|
118 | # called by our child ShareFinder |
---|
119 | def got_shares(self, shares): |
---|
120 | + log.msg("xxx %s.got_shares(%s)" % (self, shares)) |
---|
121 | self._shares.update(shares) |
---|
122 | if self._active_segment: |
---|
123 | self._active_segment.add_shares(shares) |
---|
124 | def no_more_shares(self): |
---|
125 | + log.msg("xxx %s.no_more_shares() ; _active_segment: %s" % (self, self._active_segment)) |
---|
126 | self._no_more_shares = True |
---|
127 | if self._active_segment: |
---|
128 | self._active_segment.no_more_shares() |
---|
129 | diff -rN -u old-trunk/src/allmydata/immutable/downloader/share.py new-trunk/src/allmydata/immutable/downloader/share.py |
---|
130 | --- old-trunk/src/allmydata/immutable/downloader/share.py 2010-09-07 00:58:36.000000000 -0600 |
---|
131 | +++ new-trunk/src/allmydata/immutable/downloader/share.py 2010-09-07 00:58:42.000000000 -0600 |
---|
132 | @@ -83,6 +83,7 @@ |
---|
133 | |
---|
134 | self._requested_blocks = [] # (segnum, set(observer2..)) |
---|
135 | ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"] |
---|
136 | + log.msg( "zzz ver: %r" % (ver,)) |
---|
137 | self._overrun_ok = ver["tolerates-immutable-read-overrun"] |
---|
138 | # If _overrun_ok and we guess the offsets correctly, we can get |
---|
139 | # everything in one RTT. If _overrun_ok and we guess wrong, we might |
---|
140 | diff -rN -u old-trunk/src/allmydata/test/test_immutable.py new-trunk/src/allmydata/test/test_immutable.py |
---|
141 | --- old-trunk/src/allmydata/test/test_immutable.py 2010-09-07 00:58:38.000000000 -0600 |
---|
142 | +++ new-trunk/src/allmydata/test/test_immutable.py 2010-09-07 00:58:43.000000000 -0600 |
---|
143 | @@ -1,10 +1,83 @@ |
---|
144 | from allmydata.test import common |
---|
145 | from allmydata.interfaces import NotEnoughSharesError |
---|
146 | from allmydata.util.consumer import download_to_data |
---|
147 | +from allmydata import uri |
---|
148 | from twisted.internet import defer |
---|
149 | from twisted.trial import unittest |
---|
150 | import random |
---|
151 | |
---|
152 | +from foolscap.api import eventually |
---|
153 | +from allmydata.util import log |
---|
154 | + |
---|
155 | +from allmydata.immutable.downloader import finder |
---|
156 | + |
---|
157 | +import mock |
---|
158 | + |
---|
159 | +class TestShareFinder(unittest.TestCase): |
---|
160 | + def test_sharefinder_last_request_provides_last_share(self): |
---|
161 | + # ticket #1191 |
---|
162 | + |
---|
163 | + # Suppose that K=2 and you send two DYHB requests, the first |
---|
164 | + # response offers one share, and then the second offers one |
---|
165 | + # share. Don't give up after you've received the second DYHB |
---|
166 | + # response and before you've realized that the response |
---|
167 | + # contains an offer of a share. |
---|
168 | + |
---|
169 | + rcap = uri.CHKFileURI('a'*32, 'a'*32, 2, 99, 100) |
---|
170 | + vcap = rcap.get_verify_cap() |
---|
171 | + |
---|
172 | + class MockServer(object): |
---|
173 | + def __init__(self, buckets): |
---|
174 | + self.version = { |
---|
175 | + 'http://allmydata.org/tahoe/protocols/storage/v1': { |
---|
176 | + "tolerates-immutable-read-overrun": True |
---|
177 | + } |
---|
178 | + } |
---|
179 | + self.buckets = buckets |
---|
180 | + def callRemote(self, methname, *args, **kwargs): |
---|
181 | + log.msg("yyy 2 %s.callRemote(%s, %s, %s)" % (self, methname, args, kwargs)) |
---|
182 | + d = defer.Deferred() |
---|
183 | + eventually(eventually, d.callback, self.buckets) |
---|
184 | + return d |
---|
185 | + |
---|
186 | + mockserver1 = MockServer({1: mock.Mock()}) |
---|
187 | + mockserver2 = MockServer({2: mock.Mock()}) |
---|
188 | + mockstoragebroker = mock.Mock() |
---|
189 | + mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2) ] |
---|
190 | + mockdownloadstatus = mock.Mock() |
---|
191 | + mocknode = mock.Mock() |
---|
192 | + class MockNode(object): |
---|
193 | + def __init__(self, testcase): |
---|
194 | + self.testcase = testcase |
---|
195 | + self.got = 0 |
---|
196 | + self.finished_d = defer.Deferred() |
---|
197 | + self.got_shares_d = defer.Deferred() |
---|
198 | + self.segment_size = 78 |
---|
199 | + self.guessed_segment_size = 78 |
---|
200 | + def when_finished(self): |
---|
201 | + return self.finished_d |
---|
202 | + def when_got_shares(self): |
---|
203 | + return self.got_shares_d |
---|
204 | + def get_num_segments(self): |
---|
205 | + return (2, True) |
---|
206 | + def _calculate_sizes(self, guessed_segment_size): |
---|
207 | + return {'block_size': 3, 'num_segments': 2} |
---|
208 | + def no_more_shares(self): |
---|
209 | + self.testcase.fail("The node was told by the share finder that it is destined to remain hungry.") |
---|
210 | + def got_shares(self, shares): |
---|
211 | + self.got += 1 |
---|
212 | + log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got)) |
---|
213 | + if self.got == 1: |
---|
214 | + self.got_shares_d.callback(None) |
---|
215 | + elif self.got == 2: |
---|
216 | + self.finished_d.callback(None) |
---|
217 | + mocknode = MockNode(self) |
---|
218 | + |
---|
219 | + s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus) |
---|
220 | + s.hungry() |
---|
221 | + |
---|
222 | + return mocknode.when_finished() |
---|
223 | + |
---|
224 | class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase): |
---|
225 | def test_test_code(self): |
---|
226 | # The following process of stashing the shares, running |
---|