1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import os, time |
---|
6 | from io import BytesIO |
---|
7 | from itertools import count |
---|
8 | from zope.interface import implementer |
---|
9 | from twisted.internet import defer |
---|
10 | from twisted.python import failure |
---|
11 | |
---|
12 | from allmydata.crypto import aes |
---|
13 | from allmydata.crypto import rsa |
---|
14 | from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \ |
---|
15 | IMutableUploadable |
---|
16 | from allmydata.util import base32, hashutil, mathutil, log |
---|
17 | from allmydata.util.dictutil import DictOfSets |
---|
18 | from allmydata.util.deferredutil import async_to_deferred |
---|
19 | from allmydata.util.cputhreadpool import defer_to_thread |
---|
20 | from allmydata import hashtree, codec |
---|
21 | from allmydata.storage.server import si_b2a |
---|
22 | from foolscap.api import eventually, fireEventually |
---|
23 | from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \ |
---|
24 | UncoordinatedWriteError, NotEnoughServersError |
---|
25 | from allmydata.mutable.servermap import ServerMap |
---|
26 | from allmydata.mutable.layout import get_version_from_checkstring,\ |
---|
27 | unpack_mdmf_checkstring, \ |
---|
28 | unpack_sdmf_checkstring, \ |
---|
29 | MDMFSlotWriteProxy, \ |
---|
30 | SDMFSlotWriteProxy |
---|
31 | |
---|
32 | from eliot import ( |
---|
33 | Message, |
---|
34 | start_action, |
---|
35 | ) |
---|
36 | |
---|
37 | KiB = 1024 |
---|
38 | DEFAULT_MUTABLE_MAX_SEGMENT_SIZE = 128 * KiB |
---|
39 | PUSHING_BLOCKS_STATE = 0 |
---|
40 | PUSHING_EVERYTHING_ELSE_STATE = 1 |
---|
41 | DONE_STATE = 2 |
---|
42 | |
---|
43 | @implementer(IPublishStatus) |
---|
44 | class PublishStatus(object): |
---|
45 | statusid_counter = count(0) |
---|
46 | def __init__(self): |
---|
47 | self.timings = {} |
---|
48 | self.timings["send_per_server"] = {} |
---|
49 | self.timings["encrypt"] = 0.0 |
---|
50 | self.timings["encode"] = 0.0 |
---|
51 | self.servermap = None |
---|
52 | self._problems = {} |
---|
53 | self.active = True |
---|
54 | self.storage_index = None |
---|
55 | self.helper = False |
---|
56 | self.encoding = ("?", "?") |
---|
57 | self.size = None |
---|
58 | self.status = "Not started" |
---|
59 | self.progress = 0.0 |
---|
60 | self.counter = next(self.statusid_counter) |
---|
61 | self.started = time.time() |
---|
62 | |
---|
63 | def add_per_server_time(self, server, elapsed): |
---|
64 | if server not in self.timings["send_per_server"]: |
---|
65 | self.timings["send_per_server"][server] = [] |
---|
66 | self.timings["send_per_server"][server].append(elapsed) |
---|
67 | def accumulate_encode_time(self, elapsed): |
---|
68 | self.timings["encode"] += elapsed |
---|
69 | def accumulate_encrypt_time(self, elapsed): |
---|
70 | self.timings["encrypt"] += elapsed |
---|
71 | |
---|
72 | def get_started(self): |
---|
73 | return self.started |
---|
74 | def get_storage_index(self): |
---|
75 | return self.storage_index |
---|
76 | def get_encoding(self): |
---|
77 | return self.encoding |
---|
78 | def using_helper(self): |
---|
79 | return self.helper |
---|
80 | def get_servermap(self): |
---|
81 | return self.servermap |
---|
82 | def get_size(self): |
---|
83 | return self.size |
---|
84 | def get_status(self): |
---|
85 | return self.status |
---|
86 | def get_progress(self): |
---|
87 | return self.progress |
---|
88 | def get_active(self): |
---|
89 | return self.active |
---|
90 | def get_counter(self): |
---|
91 | return self.counter |
---|
92 | def get_problems(self): |
---|
93 | return self._problems |
---|
94 | |
---|
95 | def set_storage_index(self, si): |
---|
96 | self.storage_index = si |
---|
97 | def set_helper(self, helper): |
---|
98 | self.helper = helper |
---|
99 | def set_servermap(self, servermap): |
---|
100 | self.servermap = servermap |
---|
101 | def set_encoding(self, k, n): |
---|
102 | self.encoding = (k, n) |
---|
103 | def set_size(self, size): |
---|
104 | self.size = size |
---|
105 | def set_status(self, status): |
---|
106 | self.status = status |
---|
107 | def set_progress(self, value): |
---|
108 | self.progress = value |
---|
109 | def set_active(self, value): |
---|
110 | self.active = value |
---|
111 | |
---|
112 | class LoopLimitExceededError(Exception): |
---|
113 | pass |
---|
114 | |
---|
115 | class Publish(object): |
---|
116 | """I represent a single act of publishing the mutable file to the grid. I |
---|
117 | will only publish my data if the servermap I am using still represents |
---|
118 | the current state of the world. |
---|
119 | |
---|
120 | To make the initial publish, set servermap to None. |
---|
121 | """ |
---|
122 | |
---|
123 | def __init__(self, filenode, storage_broker, servermap): |
---|
124 | self._node = filenode |
---|
125 | self._storage_broker = storage_broker |
---|
126 | self._servermap = servermap |
---|
127 | self._storage_index = self._node.get_storage_index() |
---|
128 | self._log_prefix = prefix = si_b2a(self._storage_index)[:5] |
---|
129 | num = self.log("Publish(%r): starting" % prefix, parent=None) |
---|
130 | self._log_number = num |
---|
131 | self._running = True |
---|
132 | self._first_write_error = None |
---|
133 | self._last_failure = None |
---|
134 | |
---|
135 | self._status = PublishStatus() |
---|
136 | self._status.set_storage_index(self._storage_index) |
---|
137 | self._status.set_helper(False) |
---|
138 | self._status.set_progress(0.0) |
---|
139 | self._status.set_active(True) |
---|
140 | self._version = self._node.get_version() |
---|
141 | assert self._version in (SDMF_VERSION, MDMF_VERSION) |
---|
142 | |
---|
143 | |
---|
144 | def get_status(self): |
---|
145 | return self._status |
---|
146 | |
---|
147 | def log(self, *args, **kwargs): |
---|
148 | if 'parent' not in kwargs: |
---|
149 | kwargs['parent'] = self._log_number |
---|
150 | if "facility" not in kwargs: |
---|
151 | kwargs["facility"] = "tahoe.mutable.publish" |
---|
152 | return log.msg(*args, **kwargs) |
---|
153 | |
---|
154 | |
---|
155 | def update(self, data, offset, blockhashes, version): |
---|
156 | """ |
---|
157 | I replace the contents of this file with the contents of data, |
---|
158 | starting at offset. I return a Deferred that fires with None |
---|
159 | when the replacement has been completed, or with an error if |
---|
160 | something went wrong during the process. |
---|
161 | |
---|
162 | Note that this process will not upload new shares. If the file |
---|
163 | being updated is in need of repair, callers will have to repair |
---|
164 | it on their own. |
---|
165 | """ |
---|
166 | # How this works: |
---|
167 | # 1: Make server assignments. We'll assign each share that we know |
---|
168 | # about on the grid to that server that currently holds that |
---|
169 | # share, and will not place any new shares. |
---|
170 | # 2: Setup encoding parameters. Most of these will stay the same |
---|
171 | # -- datalength will change, as will some of the offsets. |
---|
172 | # 3. Upload the new segments. |
---|
173 | # 4. Be done. |
---|
174 | assert IMutableUploadable.providedBy(data) |
---|
175 | |
---|
176 | self.data = data |
---|
177 | |
---|
178 | # XXX: Use the MutableFileVersion instead. |
---|
179 | self.datalength = self._node.get_size() |
---|
180 | if data.get_size() > self.datalength: |
---|
181 | self.datalength = data.get_size() |
---|
182 | |
---|
183 | self.log("starting update") |
---|
184 | self.log("adding new data of length %d at offset %d" % \ |
---|
185 | (data.get_size(), offset)) |
---|
186 | self.log("new data length is %d" % self.datalength) |
---|
187 | self._status.set_size(self.datalength) |
---|
188 | self._status.set_status("Started") |
---|
189 | self._started = time.time() |
---|
190 | |
---|
191 | self.done_deferred = defer.Deferred() |
---|
192 | |
---|
193 | self._writekey = self._node.get_writekey() |
---|
194 | assert self._writekey, "need write capability to publish" |
---|
195 | |
---|
196 | # first, which servers will we publish to? We require that the |
---|
197 | # servermap was updated in MODE_WRITE, so we can depend upon the |
---|
198 | # serverlist computed by that process instead of computing our own. |
---|
199 | assert self._servermap |
---|
200 | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) |
---|
201 | # we will push a version that is one larger than anything present |
---|
202 | # in the grid, according to the servermap. |
---|
203 | self._new_seqnum = self._servermap.highest_seqnum() + 1 |
---|
204 | self._status.set_servermap(self._servermap) |
---|
205 | |
---|
206 | self.log(format="new seqnum will be %(seqnum)d", |
---|
207 | seqnum=self._new_seqnum, level=log.NOISY) |
---|
208 | |
---|
209 | # We're updating an existing file, so all of the following |
---|
210 | # should be available. |
---|
211 | self.readkey = self._node.get_readkey() |
---|
212 | self.required_shares = self._node.get_required_shares() |
---|
213 | assert self.required_shares is not None |
---|
214 | self.total_shares = self._node.get_total_shares() |
---|
215 | assert self.total_shares is not None |
---|
216 | self._status.set_encoding(self.required_shares, self.total_shares) |
---|
217 | |
---|
218 | self._pubkey = self._node.get_pubkey() |
---|
219 | assert self._pubkey |
---|
220 | self._privkey = self._node.get_privkey() |
---|
221 | assert self._privkey |
---|
222 | self._encprivkey = self._node.get_encprivkey() |
---|
223 | |
---|
224 | sb = self._storage_broker |
---|
225 | full_serverlist = list(sb.get_servers_for_psi(self._storage_index)) |
---|
226 | self.full_serverlist = full_serverlist # for use later, immutable |
---|
227 | self.bad_servers = set() # servers who have errbacked/refused requests |
---|
228 | |
---|
229 | # This will set self.segment_size, self.num_segments, and |
---|
230 | # self.fec. TODO: Does it know how to do the offset? Probably |
---|
231 | # not. So do that part next. |
---|
232 | self.setup_encoding_parameters(offset=offset) |
---|
233 | |
---|
234 | # if we experience any surprises (writes which were rejected because |
---|
235 | # our test vector did not match, or shares which we didn't expect to |
---|
236 | # see), we set this flag and report an UncoordinatedWriteError at the |
---|
237 | # end of the publish process. |
---|
238 | self.surprised = False |
---|
239 | |
---|
240 | # we keep track of three tables. The first is our goal: which share |
---|
241 | # we want to see on which servers. This is initially populated by the |
---|
242 | # existing servermap. |
---|
243 | self.goal = set() # pairs of (server, shnum) tuples |
---|
244 | |
---|
245 | # the number of outstanding queries: those that are in flight and |
---|
246 | # may or may not be delivered, accepted, or acknowledged. This is |
---|
247 | # incremented when a query is sent, and decremented when the response |
---|
248 | # returns or errbacks. |
---|
249 | self.num_outstanding = 0 |
---|
250 | |
---|
251 | # the third is a table of successes: share which have actually been |
---|
252 | # placed. These are populated when responses come back with success. |
---|
253 | # When self.placed == self.goal, we're done. |
---|
254 | self.placed = set() # (server, shnum) tuples |
---|
255 | |
---|
256 | self.bad_share_checkstrings = {} |
---|
257 | |
---|
258 | # This is set at the last step of the publishing process. |
---|
259 | self.versioninfo = "" |
---|
260 | |
---|
261 | # we use the servermap to populate the initial goal: this way we will |
---|
262 | # try to update each existing share in place. Since we're |
---|
263 | # updating, we ignore damaged and missing shares -- callers must |
---|
264 | # do a repair to repair and recreate these. |
---|
265 | self.goal = set(self._servermap.get_known_shares()) |
---|
266 | |
---|
267 | # shnum -> set of IMutableSlotWriter |
---|
268 | self.writers = DictOfSets() |
---|
269 | |
---|
270 | # SDMF files are updated differently. |
---|
271 | self._version = MDMF_VERSION |
---|
272 | writer_class = MDMFSlotWriteProxy |
---|
273 | |
---|
274 | # For each (server, shnum) in self.goal, we make a |
---|
275 | # write proxy for that server. We'll use this to write |
---|
276 | # shares to the server. |
---|
277 | for (server,shnum) in self.goal: |
---|
278 | write_enabler = self._node.get_write_enabler(server) |
---|
279 | renew_secret = self._node.get_renewal_secret(server) |
---|
280 | cancel_secret = self._node.get_cancel_secret(server) |
---|
281 | secrets = (write_enabler, renew_secret, cancel_secret) |
---|
282 | |
---|
283 | writer = writer_class(shnum, |
---|
284 | server.get_storage_server(), |
---|
285 | self._storage_index, |
---|
286 | secrets, |
---|
287 | self._new_seqnum, |
---|
288 | self.required_shares, |
---|
289 | self.total_shares, |
---|
290 | self.segment_size, |
---|
291 | self.datalength) |
---|
292 | |
---|
293 | self.writers.add(shnum, writer) |
---|
294 | writer.server = server |
---|
295 | known_shares = self._servermap.get_known_shares() |
---|
296 | assert (server, shnum) in known_shares |
---|
297 | old_versionid, old_timestamp = known_shares[(server,shnum)] |
---|
298 | (old_seqnum, old_root_hash, old_salt, old_segsize, |
---|
299 | old_datalength, old_k, old_N, old_prefix, |
---|
300 | old_offsets_tuple) = old_versionid |
---|
301 | writer.set_checkstring(old_seqnum, |
---|
302 | old_root_hash, |
---|
303 | old_salt) |
---|
304 | |
---|
305 | # Our remote shares will not have a complete checkstring until |
---|
306 | # after we are done writing share data and have started to write |
---|
307 | # blocks. In the meantime, we need to know what to look for when |
---|
308 | # writing, so that we can detect UncoordinatedWriteErrors. |
---|
309 | self._checkstring = self._get_some_writer().get_checkstring() |
---|
310 | |
---|
311 | # Now, we start pushing shares. |
---|
312 | self._status.timings["setup"] = time.time() - self._started |
---|
313 | # First, we encrypt, encode, and publish the shares that we need |
---|
314 | # to encrypt, encode, and publish. |
---|
315 | |
---|
316 | # Our update process fetched these for us. We need to update |
---|
317 | # them in place as publishing happens. |
---|
318 | self.blockhashes = {} # (shnum, [blochashes]) |
---|
319 | for (i, bht) in list(blockhashes.items()): |
---|
320 | # We need to extract the leaves from our old hash tree. |
---|
321 | old_segcount = mathutil.div_ceil(version[4], |
---|
322 | version[3]) |
---|
323 | h = hashtree.IncompleteHashTree(old_segcount) |
---|
324 | bht = dict(enumerate(bht)) |
---|
325 | h.set_hashes(bht) |
---|
326 | leaves = h[h.get_leaf_index(0):] |
---|
327 | for j in range(self.num_segments - len(leaves)): |
---|
328 | leaves.append(None) |
---|
329 | |
---|
330 | assert len(leaves) >= self.num_segments |
---|
331 | self.blockhashes[i] = leaves |
---|
332 | # This list will now be the leaves that were set during the |
---|
333 | # initial upload + enough empty hashes to make it a |
---|
334 | # power-of-two. If we exceed a power of two boundary, we |
---|
335 | # should be encoding the file over again, and should not be |
---|
336 | # here. So, we have |
---|
337 | #assert len(self.blockhashes[i]) == \ |
---|
338 | # hashtree.roundup_pow2(self.num_segments), \ |
---|
339 | # len(self.blockhashes[i]) |
---|
340 | # XXX: Except this doesn't work. Figure out why. |
---|
341 | |
---|
342 | # These are filled in later, after we've modified the block hash |
---|
343 | # tree suitably. |
---|
344 | self.sharehash_leaves = None # eventually [sharehashes] |
---|
345 | self.sharehashes = {} # shnum -> [sharehash leaves necessary to |
---|
346 | # validate the share] |
---|
347 | |
---|
348 | self.log("Starting push") |
---|
349 | |
---|
350 | self._state = PUSHING_BLOCKS_STATE |
---|
351 | self._push() |
---|
352 | |
---|
353 | return self.done_deferred |
---|
354 | |
---|
355 | |
---|
356 | def publish(self, newdata): |
---|
357 | """Publish the filenode's current contents. Returns a Deferred that |
---|
358 | fires (with None) when the publish has done as much work as it's ever |
---|
359 | going to do, or errbacks with ConsistencyError if it detects a |
---|
360 | simultaneous write. |
---|
361 | """ |
---|
362 | |
---|
363 | # 0. Setup encoding parameters, encoder, and other such things. |
---|
364 | # 1. Encrypt, encode, and publish segments. |
---|
365 | assert IMutableUploadable.providedBy(newdata) |
---|
366 | |
---|
367 | self.data = newdata |
---|
368 | self.datalength = newdata.get_size() |
---|
369 | #if self.datalength >= DEFAULT_MUTABLE_MAX_SEGMENT_SIZE: |
---|
370 | # self._version = MDMF_VERSION |
---|
371 | #else: |
---|
372 | # self._version = SDMF_VERSION |
---|
373 | |
---|
374 | self.log("starting publish, datalen is %s" % self.datalength) |
---|
375 | self._status.set_size(self.datalength) |
---|
376 | self._status.set_status("Started") |
---|
377 | self._started = time.time() |
---|
378 | |
---|
379 | self.done_deferred = defer.Deferred() |
---|
380 | |
---|
381 | self._writekey = self._node.get_writekey() |
---|
382 | assert self._writekey, "need write capability to publish" |
---|
383 | |
---|
384 | # first, which servers will we publish to? We require that the |
---|
385 | # servermap was updated in MODE_WRITE, so we can depend upon the |
---|
386 | # serverlist computed by that process instead of computing our own. |
---|
387 | if self._servermap: |
---|
388 | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) |
---|
389 | # we will push a version that is one larger than anything present |
---|
390 | # in the grid, according to the servermap. |
---|
391 | self._new_seqnum = self._servermap.highest_seqnum() + 1 |
---|
392 | else: |
---|
393 | # If we don't have a servermap, that's because we're doing the |
---|
394 | # initial publish |
---|
395 | self._new_seqnum = 1 |
---|
396 | self._servermap = ServerMap() |
---|
397 | self._status.set_servermap(self._servermap) |
---|
398 | |
---|
399 | self.log(format="new seqnum will be %(seqnum)d", |
---|
400 | seqnum=self._new_seqnum, level=log.NOISY) |
---|
401 | |
---|
402 | # having an up-to-date servermap (or using a filenode that was just |
---|
403 | # created for the first time) also guarantees that the following |
---|
404 | # fields are available |
---|
405 | self.readkey = self._node.get_readkey() |
---|
406 | self.required_shares = self._node.get_required_shares() |
---|
407 | assert self.required_shares is not None |
---|
408 | self.total_shares = self._node.get_total_shares() |
---|
409 | assert self.total_shares is not None |
---|
410 | self._status.set_encoding(self.required_shares, self.total_shares) |
---|
411 | |
---|
412 | self._pubkey = self._node.get_pubkey() |
---|
413 | assert self._pubkey |
---|
414 | self._privkey = self._node.get_privkey() |
---|
415 | assert self._privkey |
---|
416 | self._encprivkey = self._node.get_encprivkey() |
---|
417 | |
---|
418 | sb = self._storage_broker |
---|
419 | full_serverlist = list(sb.get_servers_for_psi(self._storage_index)) |
---|
420 | self.full_serverlist = full_serverlist # for use later, immutable |
---|
421 | self.bad_servers = set() # servers who have errbacked/refused requests |
---|
422 | |
---|
423 | # This will set self.segment_size, self.num_segments, and |
---|
424 | # self.fec. |
---|
425 | self.setup_encoding_parameters() |
---|
426 | |
---|
427 | # if we experience any surprises (writes which were rejected because |
---|
428 | # our test vector did not match, or shares which we didn't expect to |
---|
429 | # see), we set this flag and report an UncoordinatedWriteError at the |
---|
430 | # end of the publish process. |
---|
431 | self.surprised = False |
---|
432 | |
---|
433 | # we keep track of three tables. The first is our goal: which share |
---|
434 | # we want to see on which servers. This is initially populated by the |
---|
435 | # existing servermap. |
---|
436 | self.goal = set() # pairs of (server, shnum) tuples |
---|
437 | |
---|
438 | # the number of outstanding queries: those that are in flight and |
---|
439 | # may or may not be delivered, accepted, or acknowledged. This is |
---|
440 | # incremented when a query is sent, and decremented when the response |
---|
441 | # returns or errbacks. |
---|
442 | self.num_outstanding = 0 |
---|
443 | |
---|
444 | # the third is a table of successes: share which have actually been |
---|
445 | # placed. These are populated when responses come back with success. |
---|
446 | # When self.placed == self.goal, we're done. |
---|
447 | self.placed = set() # (server, shnum) tuples |
---|
448 | |
---|
449 | self.bad_share_checkstrings = {} |
---|
450 | |
---|
451 | # This is set at the last step of the publishing process. |
---|
452 | self.versioninfo = "" |
---|
453 | |
---|
454 | # we use the servermap to populate the initial goal: this way we will |
---|
455 | # try to update each existing share in place. |
---|
456 | self.goal = set(self._servermap.get_known_shares()) |
---|
457 | |
---|
458 | # then we add in all the shares that were bad (corrupted, bad |
---|
459 | # signatures, etc). We want to replace these. |
---|
460 | for key, old_checkstring in list(self._servermap.get_bad_shares().items()): |
---|
461 | (server, shnum) = key |
---|
462 | self.goal.add( (server,shnum) ) |
---|
463 | self.bad_share_checkstrings[(server,shnum)] = old_checkstring |
---|
464 | |
---|
465 | # TODO: Make this part do server selection. |
---|
466 | self.update_goal() |
---|
467 | |
---|
468 | # shnum -> set of IMutableSlotWriter |
---|
469 | self.writers = DictOfSets() |
---|
470 | |
---|
471 | if self._version == MDMF_VERSION: |
---|
472 | writer_class = MDMFSlotWriteProxy |
---|
473 | else: |
---|
474 | writer_class = SDMFSlotWriteProxy |
---|
475 | |
---|
476 | # For each (server, shnum) in self.goal, we make a |
---|
477 | # write proxy for that server. We'll use this to write |
---|
478 | # shares to the server. |
---|
479 | for (server,shnum) in self.goal: |
---|
480 | write_enabler = self._node.get_write_enabler(server) |
---|
481 | renew_secret = self._node.get_renewal_secret(server) |
---|
482 | cancel_secret = self._node.get_cancel_secret(server) |
---|
483 | secrets = (write_enabler, renew_secret, cancel_secret) |
---|
484 | |
---|
485 | writer = writer_class(shnum, |
---|
486 | server.get_storage_server(), |
---|
487 | self._storage_index, |
---|
488 | secrets, |
---|
489 | self._new_seqnum, |
---|
490 | self.required_shares, |
---|
491 | self.total_shares, |
---|
492 | self.segment_size, |
---|
493 | self.datalength) |
---|
494 | self.writers.add(shnum, writer) |
---|
495 | writer.server = server |
---|
496 | known_shares = self._servermap.get_known_shares() |
---|
497 | if (server, shnum) in known_shares: |
---|
498 | old_versionid, old_timestamp = known_shares[(server,shnum)] |
---|
499 | (old_seqnum, old_root_hash, old_salt, old_segsize, |
---|
500 | old_datalength, old_k, old_N, old_prefix, |
---|
501 | old_offsets_tuple) = old_versionid |
---|
502 | writer.set_checkstring(old_seqnum, |
---|
503 | old_root_hash, |
---|
504 | old_salt) |
---|
505 | elif (server, shnum) in self.bad_share_checkstrings: |
---|
506 | old_checkstring = self.bad_share_checkstrings[(server, shnum)] |
---|
507 | writer.set_checkstring(old_checkstring) |
---|
508 | |
---|
509 | # Our remote shares will not have a complete checkstring until |
---|
510 | # after we are done writing share data and have started to write |
---|
511 | # blocks. In the meantime, we need to know what to look for when |
---|
512 | # writing, so that we can detect UncoordinatedWriteErrors. |
---|
513 | self._checkstring = self._get_some_writer().get_checkstring() |
---|
514 | |
---|
515 | # Now, we start pushing shares. |
---|
516 | self._status.timings["setup"] = time.time() - self._started |
---|
517 | # First, we encrypt, encode, and publish the shares that we need |
---|
518 | # to encrypt, encode, and publish. |
---|
519 | |
---|
520 | # This will eventually hold the block hash chain for each share |
---|
521 | # that we publish. We define it this way so that empty publishes |
---|
522 | # will still have something to write to the remote slot. |
---|
523 | self.blockhashes = dict([(i, []) for i in range(self.total_shares)]) |
---|
524 | for i in range(self.total_shares): |
---|
525 | blocks = self.blockhashes[i] |
---|
526 | for j in range(self.num_segments): |
---|
527 | blocks.append(None) |
---|
528 | self.sharehash_leaves = None # eventually [sharehashes] |
---|
529 | self.sharehashes = {} # shnum -> [sharehash leaves necessary to |
---|
530 | # validate the share] |
---|
531 | |
---|
532 | self.log("Starting push") |
---|
533 | |
---|
534 | self._state = PUSHING_BLOCKS_STATE |
---|
535 | self._push() |
---|
536 | |
---|
537 | return self.done_deferred |
---|
538 | |
---|
539 | def _get_some_writer(self): |
---|
540 | return list(list(self.writers.values())[0])[0] |
---|
541 | |
---|
542 | def _update_status(self): |
---|
543 | self._status.set_status("Sending Shares: %d placed out of %d, " |
---|
544 | "%d messages outstanding" % |
---|
545 | (len(self.placed), |
---|
546 | len(self.goal), |
---|
547 | self.num_outstanding)) |
---|
548 | self._status.set_progress(1.0 * len(self.placed) / len(self.goal)) |
---|
549 | |
---|
550 | |
---|
551 | def setup_encoding_parameters(self, offset=0): |
---|
552 | if self._version == MDMF_VERSION: |
---|
553 | segment_size = DEFAULT_MUTABLE_MAX_SEGMENT_SIZE # 128 KiB by default |
---|
554 | else: |
---|
555 | segment_size = self.datalength # SDMF is only one segment |
---|
556 | # this must be a multiple of self.required_shares |
---|
557 | segment_size = mathutil.next_multiple(segment_size, |
---|
558 | self.required_shares) |
---|
559 | self.segment_size = segment_size |
---|
560 | |
---|
561 | # Calculate the starting segment for the upload. |
---|
562 | if segment_size: |
---|
563 | # We use div_ceil instead of integer division here because |
---|
564 | # it is semantically correct. |
---|
565 | # If datalength isn't an even multiple of segment_size, but |
---|
566 | # is larger than segment_size, datalength // segment_size |
---|
567 | # will be the largest number such that num <= datalength and |
---|
568 | # num % segment_size == 0. But that's not what we want, |
---|
569 | # because it ignores the extra data. div_ceil will give us |
---|
570 | # the right number of segments for the data that we're |
---|
571 | # given. |
---|
572 | self.num_segments = mathutil.div_ceil(self.datalength, |
---|
573 | segment_size) |
---|
574 | |
---|
575 | self.starting_segment = offset // segment_size |
---|
576 | |
---|
577 | else: |
---|
578 | self.num_segments = 0 |
---|
579 | self.starting_segment = 0 |
---|
580 | |
---|
581 | |
---|
582 | self.log("building encoding parameters for file") |
---|
583 | self.log("got segsize %d" % self.segment_size) |
---|
584 | self.log("got %d segments" % self.num_segments) |
---|
585 | |
---|
586 | if self._version == SDMF_VERSION: |
---|
587 | assert self.num_segments in (0, 1) # SDMF |
---|
588 | # calculate the tail segment size. |
---|
589 | |
---|
590 | if segment_size and self.datalength: |
---|
591 | self.tail_segment_size = self.datalength % segment_size |
---|
592 | self.log("got tail segment size %d" % self.tail_segment_size) |
---|
593 | else: |
---|
594 | self.tail_segment_size = 0 |
---|
595 | |
---|
596 | if self.tail_segment_size == 0 and segment_size: |
---|
597 | # The tail segment is the same size as the other segments. |
---|
598 | self.tail_segment_size = segment_size |
---|
599 | |
---|
600 | # Make FEC encoders |
---|
601 | fec = codec.CRSEncoder() |
---|
602 | fec.set_params(self.segment_size, |
---|
603 | self.required_shares, self.total_shares) |
---|
604 | self.piece_size = fec.get_block_size() |
---|
605 | self.fec = fec |
---|
606 | |
---|
607 | if self.tail_segment_size == self.segment_size: |
---|
608 | self.tail_fec = self.fec |
---|
609 | else: |
---|
610 | tail_fec = codec.CRSEncoder() |
---|
611 | tail_fec.set_params(self.tail_segment_size, |
---|
612 | self.required_shares, |
---|
613 | self.total_shares) |
---|
614 | self.tail_fec = tail_fec |
---|
615 | |
---|
616 | self._current_segment = self.starting_segment |
---|
617 | self.end_segment = self.num_segments - 1 |
---|
618 | # Now figure out where the last segment should be. |
---|
619 | if self.data.get_size() != self.datalength: |
---|
620 | # We're updating a few segments in the middle of a mutable |
---|
621 | # file, so we don't want to republish the whole thing. |
---|
622 | # (we don't have enough data to do that even if we wanted |
---|
623 | # to) |
---|
624 | end = self.data.get_size() |
---|
625 | self.end_segment = end // segment_size |
---|
626 | if end % segment_size == 0: |
---|
627 | self.end_segment -= 1 |
---|
628 | |
---|
629 | self.log("got start segment %d" % self.starting_segment) |
---|
630 | self.log("got end segment %d" % self.end_segment) |
---|
631 | |
---|
632 | |
---|
633 | def _push(self, ignored=None): |
---|
634 | """ |
---|
635 | I manage state transitions. In particular, I see that we still |
---|
636 | have a good enough number of writers to complete the upload |
---|
637 | successfully. |
---|
638 | """ |
---|
639 | # Can we still successfully publish this file? |
---|
640 | # TODO: Keep track of outstanding queries before aborting the |
---|
641 | # process. |
---|
642 | num_shnums = len(self.writers) |
---|
643 | if num_shnums < self.required_shares or self.surprised: |
---|
644 | return self._failure() |
---|
645 | |
---|
646 | # Figure out what we need to do next. Each of these needs to |
---|
647 | # return a deferred so that we don't block execution when this |
---|
648 | # is first called in the upload method. |
---|
649 | if self._state == PUSHING_BLOCKS_STATE: |
---|
650 | return self.push_segment(self._current_segment) |
---|
651 | |
---|
652 | elif self._state == PUSHING_EVERYTHING_ELSE_STATE: |
---|
653 | return self.push_everything_else() |
---|
654 | |
---|
655 | # If we make it to this point, we were successful in placing the |
---|
656 | # file. |
---|
657 | return self._done() |
---|
658 | |
---|
659 | |
---|
660 | def push_segment(self, segnum): |
---|
661 | if self.num_segments == 0 and self._version == SDMF_VERSION: |
---|
662 | self._add_dummy_salts() |
---|
663 | |
---|
664 | if segnum > self.end_segment: |
---|
665 | # We don't have any more segments to push. |
---|
666 | self._state = PUSHING_EVERYTHING_ELSE_STATE |
---|
667 | return self._push() |
---|
668 | |
---|
669 | d = self._encode_segment(segnum) |
---|
670 | d.addCallback(self._push_segment, segnum) |
---|
671 | def _increment_segnum(ign): |
---|
672 | self._current_segment += 1 |
---|
673 | # XXX: I don't think we need to do addBoth here -- any errBacks |
---|
674 | # should be handled within push_segment. |
---|
675 | d.addCallback(_increment_segnum) |
---|
676 | d.addCallback(self._turn_barrier) |
---|
677 | d.addCallback(self._push) |
---|
678 | d.addErrback(self._failure) |
---|
679 | |
---|
680 | |
---|
681 | def _turn_barrier(self, result): |
---|
682 | """ |
---|
683 | I help the publish process avoid the recursion limit issues |
---|
684 | described in #237. |
---|
685 | """ |
---|
686 | return fireEventually(result) |
---|
687 | |
---|
688 | |
---|
689 | def _add_dummy_salts(self): |
---|
690 | """ |
---|
691 | SDMF files need a salt even if they're empty, or the signature |
---|
692 | won't make sense. This method adds a dummy salt to each of our |
---|
693 | SDMF writers so that they can write the signature later. |
---|
694 | """ |
---|
695 | salt = os.urandom(16) |
---|
696 | assert self._version == SDMF_VERSION |
---|
697 | |
---|
698 | for shnum, writers in self.writers.items(): |
---|
699 | for writer in writers: |
---|
700 | writer.put_salt(salt) |
---|
701 | |
---|
702 | |
---|
703 | @async_to_deferred |
---|
704 | async def _encode_segment(self, segnum): |
---|
705 | """ |
---|
706 | I encrypt and encode the segment segnum. |
---|
707 | """ |
---|
708 | started = time.time() |
---|
709 | |
---|
710 | if segnum + 1 == self.num_segments: |
---|
711 | segsize = self.tail_segment_size |
---|
712 | else: |
---|
713 | segsize = self.segment_size |
---|
714 | |
---|
715 | |
---|
716 | self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments)) |
---|
717 | data = self.data.read(segsize) |
---|
718 | if not isinstance(data, bytes): |
---|
719 | # XXX: Why does this return a list? |
---|
720 | data = b"".join(data) |
---|
721 | |
---|
722 | assert len(data) == segsize, len(data) |
---|
723 | |
---|
724 | self._status.set_status("Encrypting") |
---|
725 | |
---|
726 | def encrypt(readkey): |
---|
727 | salt = os.urandom(16) |
---|
728 | key = hashutil.ssk_readkey_data_hash(salt, readkey) |
---|
729 | encryptor = aes.create_encryptor(key) |
---|
730 | crypttext = aes.encrypt_data(encryptor, data) |
---|
731 | assert len(crypttext) == len(data) |
---|
732 | return salt, crypttext |
---|
733 | |
---|
734 | salt, crypttext = await defer_to_thread(encrypt, self.readkey) |
---|
735 | |
---|
736 | now = time.time() |
---|
737 | self._status.accumulate_encrypt_time(now - started) |
---|
738 | started = now |
---|
739 | |
---|
740 | # now apply FEC |
---|
741 | if segnum + 1 == self.num_segments: |
---|
742 | fec = self.tail_fec |
---|
743 | else: |
---|
744 | fec = self.fec |
---|
745 | |
---|
746 | self._status.set_status("Encoding") |
---|
747 | crypttext_pieces = [None] * self.required_shares |
---|
748 | piece_size = fec.get_block_size() |
---|
749 | for i in range(len(crypttext_pieces)): |
---|
750 | offset = i * piece_size |
---|
751 | piece = crypttext[offset:offset+piece_size] |
---|
752 | piece = piece + b"\x00"*(piece_size - len(piece)) # padding |
---|
753 | crypttext_pieces[i] = piece |
---|
754 | assert len(piece) == piece_size |
---|
755 | |
---|
756 | res = await fec.encode(crypttext_pieces) |
---|
757 | elapsed = time.time() - started |
---|
758 | self._status.accumulate_encode_time(elapsed) |
---|
759 | return (res, salt) |
---|
760 | |
---|
761 | @async_to_deferred |
---|
762 | async def _push_segment(self, encoded_and_salt, segnum): |
---|
763 | """ |
---|
764 | I push (data, salt) as segment number segnum. |
---|
765 | """ |
---|
766 | results, salt = encoded_and_salt |
---|
767 | shares, shareids = results |
---|
768 | self._status.set_status("Pushing segment") |
---|
769 | for i in range(len(shares)): |
---|
770 | sharedata = shares[i] |
---|
771 | shareid = shareids[i] |
---|
772 | if self._version == MDMF_VERSION: |
---|
773 | hashed = salt + sharedata |
---|
774 | else: |
---|
775 | hashed = sharedata |
---|
776 | block_hash = await defer_to_thread(hashutil.block_hash, hashed) |
---|
777 | self.blockhashes[shareid][segnum] = block_hash |
---|
778 | # find the writer for this share |
---|
779 | writers = self.writers[shareid] |
---|
780 | for writer in writers: |
---|
781 | writer.put_block(sharedata, segnum, salt) |
---|
782 | |
---|
783 | |
---|
784 | def push_everything_else(self): |
---|
785 | """ |
---|
786 | I put everything else associated with a share. |
---|
787 | """ |
---|
788 | self._pack_started = time.time() |
---|
789 | self.push_encprivkey() |
---|
790 | self.push_blockhashes() |
---|
791 | self.push_sharehashes() |
---|
792 | self.push_toplevel_hashes_and_signature() |
---|
793 | d = self.finish_publishing() |
---|
794 | def _change_state(ignored): |
---|
795 | self._state = DONE_STATE |
---|
796 | d.addCallback(_change_state) |
---|
797 | d.addCallback(self._push) |
---|
798 | return d |
---|
799 | |
---|
800 | |
---|
801 | def push_encprivkey(self): |
---|
802 | encprivkey = self._encprivkey |
---|
803 | self._status.set_status("Pushing encrypted private key") |
---|
804 | for shnum, writers in self.writers.items(): |
---|
805 | for writer in writers: |
---|
806 | writer.put_encprivkey(encprivkey) |
---|
807 | |
---|
808 | |
---|
809 | def push_blockhashes(self): |
---|
810 | self.sharehash_leaves = [None] * len(self.blockhashes) |
---|
811 | self._status.set_status("Building and pushing block hash tree") |
---|
812 | for shnum, blockhashes in list(self.blockhashes.items()): |
---|
813 | t = hashtree.HashTree(blockhashes) |
---|
814 | self.blockhashes[shnum] = list(t) |
---|
815 | # set the leaf for future use. |
---|
816 | self.sharehash_leaves[shnum] = t[0] |
---|
817 | |
---|
818 | writers = self.writers[shnum] |
---|
819 | for writer in writers: |
---|
820 | writer.put_blockhashes(self.blockhashes[shnum]) |
---|
821 | |
---|
822 | |
---|
823 | def push_sharehashes(self): |
---|
824 | self._status.set_status("Building and pushing share hash chain") |
---|
825 | share_hash_tree = hashtree.HashTree(self.sharehash_leaves) |
---|
826 | for shnum in range(len(self.sharehash_leaves)): |
---|
827 | needed_indices = share_hash_tree.needed_hashes(shnum) |
---|
828 | self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i]) |
---|
829 | for i in needed_indices] ) |
---|
830 | writers = self.writers[shnum] |
---|
831 | for writer in writers: |
---|
832 | writer.put_sharehashes(self.sharehashes[shnum]) |
---|
833 | self.root_hash = share_hash_tree[0] |
---|
834 | |
---|
835 | |
---|
836 | def push_toplevel_hashes_and_signature(self): |
---|
837 | # We need to to three things here: |
---|
838 | # - Push the root hash and salt hash |
---|
839 | # - Get the checkstring of the resulting layout; sign that. |
---|
840 | # - Push the signature |
---|
841 | self._status.set_status("Pushing root hashes and signature") |
---|
842 | for shnum in range(self.total_shares): |
---|
843 | writers = self.writers[shnum] |
---|
844 | for writer in writers: |
---|
845 | writer.put_root_hash(self.root_hash) |
---|
846 | self._update_checkstring() |
---|
847 | self._make_and_place_signature() |
---|
848 | |
---|
849 | |
---|
850 | def _update_checkstring(self): |
---|
851 | """ |
---|
852 | After putting the root hash, MDMF files will have the |
---|
853 | checkstring written to the storage server. This means that we |
---|
854 | can update our copy of the checkstring so we can detect |
---|
855 | uncoordinated writes. SDMF files will have the same checkstring, |
---|
856 | so we need not do anything. |
---|
857 | """ |
---|
858 | self._checkstring = self._get_some_writer().get_checkstring() |
---|
859 | |
---|
860 | |
---|
861 | def _make_and_place_signature(self): |
---|
862 | """ |
---|
863 | I create and place the signature. |
---|
864 | """ |
---|
865 | started = time.time() |
---|
866 | self._status.set_status("Signing prefix") |
---|
867 | signable = self._get_some_writer().get_signable() |
---|
868 | self.signature = rsa.sign_data(self._privkey, signable) |
---|
869 | |
---|
870 | for (shnum, writers) in self.writers.items(): |
---|
871 | for writer in writers: |
---|
872 | writer.put_signature(self.signature) |
---|
873 | self._status.timings['sign'] = time.time() - started |
---|
874 | |
---|
875 | |
---|
876 | def finish_publishing(self): |
---|
877 | # We're almost done -- we just need to put the verification key |
---|
878 | # and the offsets |
---|
879 | started = time.time() |
---|
880 | self._status.set_status("Pushing shares") |
---|
881 | self._started_pushing = started |
---|
882 | ds = [] |
---|
883 | verification_key = rsa.der_string_from_verifying_key(self._pubkey) |
---|
884 | |
---|
885 | for (shnum, writers) in list(self.writers.copy().items()): |
---|
886 | for writer in writers: |
---|
887 | writer.put_verification_key(verification_key) |
---|
888 | self.num_outstanding += 1 |
---|
889 | def _no_longer_outstanding(res): |
---|
890 | self.num_outstanding -= 1 |
---|
891 | return res |
---|
892 | |
---|
893 | d = writer.finish_publishing() |
---|
894 | d.addBoth(_no_longer_outstanding) |
---|
895 | d.addErrback(self._connection_problem, writer) |
---|
896 | d.addCallback(self._got_write_answer, writer, started) |
---|
897 | ds.append(d) |
---|
898 | self._record_verinfo() |
---|
899 | self._status.timings['pack'] = time.time() - started |
---|
900 | return defer.DeferredList(ds) |
---|
901 | |
---|
902 | |
---|
903 | def _record_verinfo(self): |
---|
904 | self.versioninfo = self._get_some_writer().get_verinfo() |
---|
905 | |
---|
906 | |
---|
907 | def _connection_problem(self, f, writer): |
---|
908 | """ |
---|
909 | We ran into a connection problem while working with writer, and |
---|
910 | need to deal with that. |
---|
911 | """ |
---|
912 | self.log("found problem: %s" % str(f)) |
---|
913 | self._last_failure = f |
---|
914 | self.writers.discard(writer.shnum, writer) |
---|
915 | |
---|
916 | |
---|
917 | def log_goal(self, goal, message=""): |
---|
918 | logmsg = [message] |
---|
919 | for (shnum, server) in sorted([(s,p) for (p,s) in goal], key=lambda t: (id(t[0]), id(t[1]))): |
---|
920 | logmsg.append("sh%d to [%r]" % (shnum, server.get_name())) |
---|
921 | self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY) |
---|
922 | self.log("we are planning to push new seqnum=#%d" % self._new_seqnum, |
---|
923 | level=log.NOISY) |
---|
924 | |
---|
925 | def update_goal(self): |
---|
926 | self.log_goal(self.goal, "before update: ") |
---|
927 | |
---|
928 | # first, remove any bad servers from our goal |
---|
929 | self.goal = set([ (server, shnum) |
---|
930 | for (server, shnum) in self.goal |
---|
931 | if server not in self.bad_servers ]) |
---|
932 | |
---|
933 | # find the homeless shares: |
---|
934 | homefull_shares = set([shnum for (server, shnum) in self.goal]) |
---|
935 | homeless_shares = set(range(self.total_shares)) - homefull_shares |
---|
936 | homeless_shares = sorted(list(homeless_shares)) |
---|
937 | # place them somewhere. We prefer unused servers at the beginning of |
---|
938 | # the available server list. |
---|
939 | |
---|
940 | if not homeless_shares: |
---|
941 | return |
---|
942 | |
---|
943 | # if an old share X is on a node, put the new share X there too. |
---|
944 | # TODO: 1: redistribute shares to achieve one-per-server, by copying |
---|
945 | # shares from existing servers to new (less-crowded) ones. The |
---|
946 | # old shares must still be updated. |
---|
947 | # TODO: 2: move those shares instead of copying them, to reduce future |
---|
948 | # update work |
---|
949 | |
---|
950 | # this is a bit CPU intensive but easy to analyze. We create a sort |
---|
951 | # order for each server. If the server is marked as bad, we don't |
---|
952 | # even put them in the list. Then we care about the number of shares |
---|
953 | # which have already been assigned to them. After that we care about |
---|
954 | # their permutation order. |
---|
955 | old_assignments = DictOfSets() |
---|
956 | for (server, shnum) in self.goal: |
---|
957 | old_assignments.add(server, shnum) |
---|
958 | |
---|
959 | serverlist = [] |
---|
960 | |
---|
961 | action = start_action( |
---|
962 | action_type=u"mutable:upload:update_goal", |
---|
963 | homeless_shares=len(homeless_shares), |
---|
964 | ) |
---|
965 | with action: |
---|
966 | for i, server in enumerate(self.full_serverlist): |
---|
967 | serverid = server.get_serverid() |
---|
968 | if server in self.bad_servers: |
---|
969 | Message.log( |
---|
970 | message_type=u"mutable:upload:bad-server", |
---|
971 | server_id=serverid, |
---|
972 | ) |
---|
973 | continue |
---|
974 | # if we have >= 1 grid-managers, this checks that we have |
---|
975 | # a valid certificate for this server |
---|
976 | if not server.upload_permitted(): |
---|
977 | Message.log( |
---|
978 | message_type=u"mutable:upload:no-gm-certs", |
---|
979 | server_id=serverid, |
---|
980 | ) |
---|
981 | continue |
---|
982 | |
---|
983 | entry = (len(old_assignments.get(server, [])), i, serverid, server) |
---|
984 | serverlist.append(entry) |
---|
985 | serverlist.sort() |
---|
986 | |
---|
987 | if not serverlist: |
---|
988 | raise NotEnoughServersError("Ran out of non-bad servers, " |
---|
989 | "first_error=%s" % |
---|
990 | str(self._first_write_error), |
---|
991 | self._first_write_error) |
---|
992 | |
---|
993 | # we then index this serverlist with an integer, because we may have |
---|
994 | # to wrap. We update the goal as we go. |
---|
995 | i = 0 |
---|
996 | for shnum in homeless_shares: |
---|
997 | (ignored1, ignored2, ignored3, server) = serverlist[i] |
---|
998 | # if we are forced to send a share to a server that already has |
---|
999 | # one, we may have two write requests in flight, and the |
---|
1000 | # servermap (which was computed before either request was sent) |
---|
1001 | # won't reflect the new shares, so the second response will be |
---|
1002 | # surprising. There is code in _got_write_answer() to tolerate |
---|
1003 | # this, otherwise it would cause the publish to fail with an |
---|
1004 | # UncoordinatedWriteError. See #546 for details of the trouble |
---|
1005 | # this used to cause. |
---|
1006 | self.goal.add( (server, shnum) ) |
---|
1007 | i += 1 |
---|
1008 | if i >= len(serverlist): |
---|
1009 | i = 0 |
---|
1010 | self.log_goal(self.goal, "after update: ") |
---|
1011 | |
---|
1012 | |
---|
1013 | def _got_write_answer(self, answer, writer, started): |
---|
1014 | if not answer: |
---|
1015 | # SDMF writers only pretend to write when readers set their |
---|
1016 | # blocks, salts, and so on -- they actually just write once, |
---|
1017 | # at the end of the upload process. In fake writes, they |
---|
1018 | # return defer.succeed(None). If we see that, we shouldn't |
---|
1019 | # bother checking it. |
---|
1020 | return |
---|
1021 | |
---|
1022 | server = writer.server |
---|
1023 | lp = self.log("_got_write_answer from %r, share %d" % |
---|
1024 | (server.get_name(), writer.shnum)) |
---|
1025 | |
---|
1026 | now = time.time() |
---|
1027 | elapsed = now - started |
---|
1028 | |
---|
1029 | self._status.add_per_server_time(server, elapsed) |
---|
1030 | |
---|
1031 | wrote, read_data = answer |
---|
1032 | |
---|
1033 | surprise_shares = set(read_data.keys()) - set([writer.shnum]) |
---|
1034 | |
---|
1035 | # We need to remove from surprise_shares any shares that we are |
---|
1036 | # knowingly also writing to that server from other writers. |
---|
1037 | |
---|
1038 | # TODO: Precompute this. |
---|
1039 | shares = [] |
---|
1040 | for shnum, writers in self.writers.items(): |
---|
1041 | shares.extend([x.shnum for x in writers if x.server == server]) |
---|
1042 | known_shnums = set(shares) |
---|
1043 | surprise_shares -= known_shnums |
---|
1044 | self.log("found the following surprise shares: %s" % |
---|
1045 | str(surprise_shares)) |
---|
1046 | |
---|
1047 | # Now surprise shares contains all of the shares that we did not |
---|
1048 | # expect to be there. |
---|
1049 | |
---|
1050 | surprised = False |
---|
1051 | for shnum in surprise_shares: |
---|
1052 | # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX) |
---|
1053 | checkstring = read_data[shnum][0] |
---|
1054 | # What we want to do here is to see if their (seqnum, |
---|
1055 | # roothash, salt) is the same as our (seqnum, roothash, |
---|
1056 | # salt), or the equivalent for MDMF. The best way to do this |
---|
1057 | # is to store a packed representation of our checkstring |
---|
1058 | # somewhere, then not bother unpacking the other |
---|
1059 | # checkstring. |
---|
1060 | if checkstring == self._checkstring: |
---|
1061 | # they have the right share, somehow |
---|
1062 | |
---|
1063 | if (server,shnum) in self.goal: |
---|
1064 | # and we want them to have it, so we probably sent them a |
---|
1065 | # copy in an earlier write. This is ok, and avoids the |
---|
1066 | # #546 problem. |
---|
1067 | continue |
---|
1068 | |
---|
1069 | # They aren't in our goal, but they are still for the right |
---|
1070 | # version. Somebody else wrote them, and it's a convergent |
---|
1071 | # uncoordinated write. Pretend this is ok (don't be |
---|
1072 | # surprised), since I suspect there's a decent chance that |
---|
1073 | # we'll hit this in normal operation. |
---|
1074 | continue |
---|
1075 | |
---|
1076 | else: |
---|
1077 | # the new shares are of a different version |
---|
1078 | if server in self._servermap.get_reachable_servers(): |
---|
1079 | # we asked them about their shares, so we had knowledge |
---|
1080 | # of what they used to have. Any surprising shares must |
---|
1081 | # have come from someone else, so UCW. |
---|
1082 | surprised = True |
---|
1083 | else: |
---|
1084 | # we didn't ask them, and now we've discovered that they |
---|
1085 | # have a share we didn't know about. This indicates that |
---|
1086 | # mapupdate should have wokred harder and asked more |
---|
1087 | # servers before concluding that it knew about them all. |
---|
1088 | |
---|
1089 | # signal UCW, but make sure to ask this server next time, |
---|
1090 | # so we'll remember to update it if/when we retry. |
---|
1091 | surprised = True |
---|
1092 | # TODO: ask this server next time. I don't yet have a good |
---|
1093 | # way to do this. Two insufficient possibilities are: |
---|
1094 | # |
---|
1095 | # self._servermap.add_new_share(server, shnum, verinfo, now) |
---|
1096 | # but that requires fetching/validating/parsing the whole |
---|
1097 | # version string, and all we have is the checkstring |
---|
1098 | # self._servermap.mark_bad_share(server, shnum, checkstring) |
---|
1099 | # that will make publish overwrite the share next time, |
---|
1100 | # but it won't re-query the server, and it won't make |
---|
1101 | # mapupdate search further |
---|
1102 | |
---|
1103 | # TODO later: when publish starts, do |
---|
1104 | # servermap.get_best_version(), extract the seqnum, |
---|
1105 | # subtract one, and store as highest-replaceable-seqnum. |
---|
1106 | # Then, if this surprise-because-we-didn't-ask share is |
---|
1107 | # of highest-replaceable-seqnum or lower, we're allowed |
---|
1108 | # to replace it: send out a new writev (or rather add it |
---|
1109 | # to self.goal and loop). |
---|
1110 | |
---|
1111 | surprised = True |
---|
1112 | |
---|
1113 | if surprised: |
---|
1114 | self.log("they had shares %s that we didn't know about" % |
---|
1115 | (list(surprise_shares),), |
---|
1116 | parent=lp, level=log.WEIRD, umid="un9CSQ") |
---|
1117 | self.surprised = True |
---|
1118 | |
---|
1119 | if not wrote: |
---|
1120 | # TODO: there are two possibilities. The first is that the server |
---|
1121 | # is full (or just doesn't want to give us any room), which means |
---|
1122 | # we shouldn't ask them again, but is *not* an indication of an |
---|
1123 | # uncoordinated write. The second is that our testv failed, which |
---|
1124 | # *does* indicate an uncoordinated write. We currently don't have |
---|
1125 | # a way to tell these two apart (in fact, the storage server code |
---|
1126 | # doesn't have the option of refusing our share). |
---|
1127 | # |
---|
1128 | # If the server is full, mark the server as bad (so we don't ask |
---|
1129 | # them again), but don't set self.surprised. The loop() will find |
---|
1130 | # a new server. |
---|
1131 | # |
---|
1132 | # If the testv failed, log it, set self.surprised, but don't |
---|
1133 | # bother adding to self.bad_servers . |
---|
1134 | |
---|
1135 | self.log("our testv failed, so the write did not happen", |
---|
1136 | parent=lp, level=log.WEIRD, umid="8sc26g") |
---|
1137 | self.surprised = True |
---|
1138 | self.bad_servers.add(server) # don't ask them again |
---|
1139 | # use the checkstring to add information to the log message |
---|
1140 | unknown_format = False |
---|
1141 | for (shnum,readv) in list(read_data.items()): |
---|
1142 | checkstring = readv[0] |
---|
1143 | version = get_version_from_checkstring(checkstring) |
---|
1144 | if version == MDMF_VERSION: |
---|
1145 | (other_seqnum, |
---|
1146 | other_roothash) = unpack_mdmf_checkstring(checkstring) |
---|
1147 | elif version == SDMF_VERSION: |
---|
1148 | (other_seqnum, |
---|
1149 | other_roothash, |
---|
1150 | other_IV) = unpack_sdmf_checkstring(checkstring) |
---|
1151 | else: |
---|
1152 | unknown_format = True |
---|
1153 | expected_version = self._servermap.version_on_server(server, |
---|
1154 | shnum) |
---|
1155 | if expected_version: |
---|
1156 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
1157 | offsets_tuple) = expected_version |
---|
1158 | msg = ("somebody modified the share on us:" |
---|
1159 | " shnum=%d: I thought they had #%d:R=%r," % |
---|
1160 | (shnum, |
---|
1161 | seqnum, base32.b2a(root_hash)[:4])) |
---|
1162 | if unknown_format: |
---|
1163 | msg += (" but I don't know how to read share" |
---|
1164 | " format %d" % version) |
---|
1165 | else: |
---|
1166 | msg += " but testv reported #%d:R=%r" % \ |
---|
1167 | (other_seqnum, base32.b2a(other_roothash)[:4]) |
---|
1168 | self.log(msg, parent=lp, level=log.NOISY) |
---|
1169 | # if expected_version==None, then we didn't expect to see a |
---|
1170 | # share on that server, and the 'surprise_shares' clause |
---|
1171 | # above will have logged it. |
---|
1172 | return |
---|
1173 | |
---|
1174 | # and update the servermap |
---|
1175 | # self.versioninfo is set during the last phase of publishing. |
---|
1176 | # If we get there, we know that responses correspond to placed |
---|
1177 | # shares, and can safely execute these statements. |
---|
1178 | if self.versioninfo: |
---|
1179 | self.log("wrote successfully: adding new share to servermap") |
---|
1180 | self._servermap.add_new_share(server, writer.shnum, |
---|
1181 | self.versioninfo, started) |
---|
1182 | self.placed.add( (server, writer.shnum) ) |
---|
1183 | self._update_status() |
---|
1184 | # the next method in the deferred chain will check to see if |
---|
1185 | # we're done and successful. |
---|
1186 | return |
---|
1187 | |
---|
1188 | |
---|
1189 | def _done(self): |
---|
1190 | if not self._running: |
---|
1191 | return |
---|
1192 | self._running = False |
---|
1193 | now = time.time() |
---|
1194 | self._status.timings["total"] = now - self._started |
---|
1195 | |
---|
1196 | elapsed = now - self._started_pushing |
---|
1197 | self._status.timings['push'] = elapsed |
---|
1198 | |
---|
1199 | self._status.set_active(False) |
---|
1200 | self.log("Publish done, success") |
---|
1201 | self._status.set_status("Finished") |
---|
1202 | self._status.set_progress(1.0) |
---|
1203 | # Get k and segsize, then give them to the caller. |
---|
1204 | hints = {} |
---|
1205 | hints['segsize'] = self.segment_size |
---|
1206 | hints['k'] = self.required_shares |
---|
1207 | self._node.set_downloader_hints(hints) |
---|
1208 | eventually(self.done_deferred.callback, None) |
---|
1209 | |
---|
1210 | def _failure(self, f=None): |
---|
1211 | if f: |
---|
1212 | self._last_failure = f |
---|
1213 | |
---|
1214 | if not self.surprised: |
---|
1215 | # We ran out of servers |
---|
1216 | msg = "Publish ran out of good servers" |
---|
1217 | if self._last_failure: |
---|
1218 | msg += ", last failure was: %s" % str(self._last_failure) |
---|
1219 | self.log(msg) |
---|
1220 | e = NotEnoughServersError(msg) |
---|
1221 | |
---|
1222 | else: |
---|
1223 | # We ran into shares that we didn't recognize, which means |
---|
1224 | # that we need to return an UncoordinatedWriteError. |
---|
1225 | self.log("Publish failed with UncoordinatedWriteError") |
---|
1226 | e = UncoordinatedWriteError() |
---|
1227 | f = failure.Failure(e) |
---|
1228 | eventually(self.done_deferred.callback, f) |
---|
1229 | |
---|
1230 | |
---|
1231 | @implementer(IMutableUploadable) |
---|
1232 | class MutableFileHandle(object): |
---|
1233 | """ |
---|
1234 | I am a mutable uploadable built around a filehandle-like object, |
---|
1235 | usually either a BytesIO instance or a handle to an actual file. |
---|
1236 | """ |
---|
1237 | |
---|
1238 | def __init__(self, filehandle): |
---|
1239 | # The filehandle is defined as a generally file-like object that |
---|
1240 | # has these two methods. We don't care beyond that. |
---|
1241 | assert hasattr(filehandle, "read") |
---|
1242 | assert hasattr(filehandle, "close") |
---|
1243 | |
---|
1244 | self._filehandle = filehandle |
---|
1245 | # We must start reading at the beginning of the file, or we risk |
---|
1246 | # encountering errors when the data read does not match the size |
---|
1247 | # reported to the uploader. |
---|
1248 | self._filehandle.seek(0) |
---|
1249 | |
---|
1250 | # We have not yet read anything, so our position is 0. |
---|
1251 | self._marker = 0 |
---|
1252 | |
---|
1253 | |
---|
1254 | def get_size(self): |
---|
1255 | """ |
---|
1256 | I return the amount of data in my filehandle. |
---|
1257 | """ |
---|
1258 | if not hasattr(self, "_size"): |
---|
1259 | old_position = self._filehandle.tell() |
---|
1260 | # Seek to the end of the file by seeking 0 bytes from the |
---|
1261 | # file's end |
---|
1262 | self._filehandle.seek(0, os.SEEK_END) |
---|
1263 | self._size = self._filehandle.tell() |
---|
1264 | # Restore the previous position, in case this was called |
---|
1265 | # after a read. |
---|
1266 | self._filehandle.seek(old_position) |
---|
1267 | assert self._filehandle.tell() == old_position |
---|
1268 | |
---|
1269 | assert hasattr(self, "_size") |
---|
1270 | return self._size |
---|
1271 | |
---|
1272 | |
---|
1273 | def pos(self): |
---|
1274 | """ |
---|
1275 | I return the position of my read marker -- i.e., how much data I |
---|
1276 | have already read and returned to callers. |
---|
1277 | """ |
---|
1278 | return self._marker |
---|
1279 | |
---|
1280 | |
---|
1281 | def read(self, length): |
---|
1282 | """ |
---|
1283 | I return some data (up to length bytes) from my filehandle. |
---|
1284 | |
---|
1285 | In most cases, I return length bytes, but sometimes I won't -- |
---|
1286 | for example, if I am asked to read beyond the end of a file, or |
---|
1287 | an error occurs. |
---|
1288 | """ |
---|
1289 | results = self._filehandle.read(length) |
---|
1290 | self._marker += len(results) |
---|
1291 | return [results] |
---|
1292 | |
---|
1293 | |
---|
1294 | def close(self): |
---|
1295 | """ |
---|
1296 | I close the underlying filehandle. Any further operations on the |
---|
1297 | filehandle fail at this point. |
---|
1298 | """ |
---|
1299 | self._filehandle.close() |
---|
1300 | |
---|
1301 | |
---|
1302 | class MutableData(MutableFileHandle): |
---|
1303 | """ |
---|
1304 | I am a mutable uploadable built around a string, which I then cast |
---|
1305 | into a BytesIO and treat as a filehandle. |
---|
1306 | """ |
---|
1307 | |
---|
1308 | def __init__(self, s): |
---|
1309 | # Take a string and return a file-like uploadable. |
---|
1310 | assert isinstance(s, bytes) |
---|
1311 | |
---|
1312 | MutableFileHandle.__init__(self, BytesIO(s)) |
---|
1313 | |
---|
1314 | |
---|
1315 | @implementer(IMutableUploadable) |
---|
1316 | class TransformingUploadable(object): |
---|
1317 | """ |
---|
1318 | I am an IMutableUploadable that wraps another IMutableUploadable, |
---|
1319 | and some segments that are already on the grid. When I am called to |
---|
1320 | read, I handle merging of boundary segments. |
---|
1321 | """ |
---|
1322 | |
---|
1323 | |
---|
1324 | def __init__(self, data, offset, segment_size, start, end): |
---|
1325 | assert IMutableUploadable.providedBy(data) |
---|
1326 | |
---|
1327 | self._newdata = data |
---|
1328 | self._offset = offset |
---|
1329 | self._segment_size = segment_size |
---|
1330 | self._start = start |
---|
1331 | self._end = end |
---|
1332 | |
---|
1333 | self._read_marker = 0 |
---|
1334 | |
---|
1335 | self._first_segment_offset = offset % segment_size |
---|
1336 | |
---|
1337 | num = self.log("TransformingUploadable: starting", parent=None) |
---|
1338 | self._log_number = num |
---|
1339 | self.log("got fso: %d" % self._first_segment_offset) |
---|
1340 | self.log("got offset: %d" % self._offset) |
---|
1341 | |
---|
1342 | |
---|
1343 | def log(self, *args, **kwargs): |
---|
1344 | if 'parent' not in kwargs: |
---|
1345 | kwargs['parent'] = self._log_number |
---|
1346 | if "facility" not in kwargs: |
---|
1347 | kwargs["facility"] = "tahoe.mutable.transforminguploadable" |
---|
1348 | return log.msg(*args, **kwargs) |
---|
1349 | |
---|
1350 | |
---|
1351 | def get_size(self): |
---|
1352 | return self._offset + self._newdata.get_size() |
---|
1353 | |
---|
1354 | |
---|
1355 | def read(self, length): |
---|
1356 | # We can get data from 3 sources here. |
---|
1357 | # 1. The first of the segments provided to us. |
---|
1358 | # 2. The data that we're replacing things with. |
---|
1359 | # 3. The last of the segments provided to us. |
---|
1360 | |
---|
1361 | # are we in state 0? |
---|
1362 | self.log("reading %d bytes" % length) |
---|
1363 | |
---|
1364 | old_start_data = b"" |
---|
1365 | old_data_length = self._first_segment_offset - self._read_marker |
---|
1366 | if old_data_length > 0: |
---|
1367 | if old_data_length > length: |
---|
1368 | old_data_length = length |
---|
1369 | self.log("returning %d bytes of old start data" % old_data_length) |
---|
1370 | |
---|
1371 | old_data_end = old_data_length + self._read_marker |
---|
1372 | old_start_data = self._start[self._read_marker:old_data_end] |
---|
1373 | length -= old_data_length |
---|
1374 | else: |
---|
1375 | # otherwise calculations later get screwed up. |
---|
1376 | old_data_length = 0 |
---|
1377 | |
---|
1378 | # Is there enough new data to satisfy this read? If not, we need |
---|
1379 | # to pad the end of the data with data from our last segment. |
---|
1380 | old_end_length = length - \ |
---|
1381 | (self._newdata.get_size() - self._newdata.pos()) |
---|
1382 | old_end_data = b"" |
---|
1383 | if old_end_length > 0: |
---|
1384 | self.log("reading %d bytes of old end data" % old_end_length) |
---|
1385 | |
---|
1386 | # TODO: We're not explicitly checking for tail segment size |
---|
1387 | # here. Is that a problem? |
---|
1388 | old_data_offset = (length - old_end_length + \ |
---|
1389 | old_data_length) % self._segment_size |
---|
1390 | self.log("reading at offset %d" % old_data_offset) |
---|
1391 | old_end = old_data_offset + old_end_length |
---|
1392 | old_end_data = self._end[old_data_offset:old_end] |
---|
1393 | length -= old_end_length |
---|
1394 | assert length == self._newdata.get_size() - self._newdata.pos() |
---|
1395 | |
---|
1396 | self.log("reading %d bytes of new data" % length) |
---|
1397 | new_data = self._newdata.read(length) |
---|
1398 | new_data = b"".join(new_data) |
---|
1399 | |
---|
1400 | self._read_marker += len(old_start_data + new_data + old_end_data) |
---|
1401 | |
---|
1402 | return old_start_data + new_data + old_end_data |
---|
1403 | |
---|
1404 | def close(self): |
---|
1405 | pass |
---|