Ticket #393: 393status8.dpatch

File 393status8.dpatch, 259.8 KB (added by kevan, at 2010-06-25T00:15:18Z)
Line 
1Thu Jun 24 16:34:23 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
2  * Add MDMF reader and writer
3 
4  The MDMF/SDMF reader and MDMF writer are similar to the object proxies
5  that exist for immutable files. They abstract away details of
6  connection, state, and caching from their callers (in this case, the
7  download, servermap updater, and uploader), and expose methods to get
8  and set information on the remote server.
9 
10  MDMFSlotReadProxy reads a mutable file from the server, doing the right
11  thing (in most cases) regardless of whether the file is MDMF or SDMF. It
12  allows callers to tell it how to batch and flush reads.
13 
14  MDMFSlotWriteProxy writes an MDMF mutable file to a server.
15 
16  This patch also includes tests for MDMFSlotReadProxy and
17  MDMFSlotWriteProxy.
18
19Thu Jun 24 16:38:43 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
20  * Write a segmented mutable downloader
21 
22  The segmented mutable downloader can deal with MDMF files (files with
23  one or more segments in MDMF format) and SDMF files (files with one
24  segment in SDMF format). It is backwards compatible with the old
25  file format.
26 
27  This patch also contains tests for the segmented mutable downloader.
28
29Thu Jun 24 16:42:08 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
30  * Alter the ServermapUpdater to find MDMF files
31 
32  The servermapupdater should find MDMF files on a grid in the same way
33  that it finds SDMF files. This patch makes it do that.
34
35Thu Jun 24 16:44:10 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
36  * Make a segmented mutable uploader
37 
38  The mutable file uploader should be able to publish files with one
39  segment and files with multiple segments. This patch makes it do that.
40  This is still incomplete, and rather ugly -- I need to flesh out error
41  handling, I need to write tests, and I need to remove some of the uglier
42  kludges in the process before I can call this done.
43
44Thu Jun 24 16:46:37 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
45  * Misc. changes to support the work I'm doing
46 
47      - Add a notion of file version number to interfaces.py
48      - Alter mutable file node interfaces to have a notion of version,
49        though this may be changed later.
50      - Alter mutable/filenode.py to conform to these changes.
51      - Add a salt hasher to util/hashutil.py
52
53Thu Jun 24 16:48:33 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
54  * nodemaker.py: create MDMF files when asked to
55
56Thu Jun 24 16:49:05 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
57  * storage/server.py: minor code cleanup
58
59Thu Jun 24 16:49:24 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
60  * test/test_mutable.py: alter some tests that were failing due to MDMF; minor code cleanup.
61
62New patches:
63
64[Add MDMF reader and writer
65Kevan Carstensen <kevan@isnotajoke.com>**20100624233423
66 Ignore-this: 4e00d2790c6c28c1424c42c0595c324a
67 
68 The MDMF/SDMF reader and MDMF writer are similar to the object proxies
69 that exist for immutable files. They abstract away details of
70 connection, state, and caching from their callers (in this case, the
71 download, servermap updater, and uploader), and expose methods to get
72 and set information on the remote server.
73 
74 MDMFSlotReadProxy reads a mutable file from the server, doing the right
75 thing (in most cases) regardless of whether the file is MDMF or SDMF. It
76 allows callers to tell it how to batch and flush reads.
77 
78 MDMFSlotWriteProxy writes an MDMF mutable file to a server.
79 
80 This patch also includes tests for MDMFSlotReadProxy and
81 MDMFSlotWriteProxy.
82] {
83hunk ./src/allmydata/mutable/layout.py 4
84 
85 import struct
86 from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
87+from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
88+                                 MDMF_VERSION
89+from allmydata.util import mathutil, observer
90+from twisted.python import failure
91+from twisted.internet import defer
92+
93+
94+# These strings describe the format of the packed structs they help process
95+# Here's what they mean:
96+#
97+#  PREFIX:
98+#    >: Big-endian byte order; the most significant byte is first (leftmost).
99+#    B: The version information; an 8 bit version identifier. Stored as
100+#       an unsigned char. This is currently 00 00 00 00; our modifications
101+#       will turn it into 00 00 00 01.
102+#    Q: The sequence number; this is sort of like a revision history for
103+#       mutable files; they start at 1 and increase as they are changed after
104+#       being uploaded. Stored as an unsigned long long, which is 8 bytes in
105+#       length.
106+#  32s: The root hash of the share hash tree. We use sha-256d, so we use 32
107+#       characters = 32 bytes to store the value.
108+#  16s: The salt for the readkey. This is a 16-byte random value, stored as
109+#       16 characters.
110+#
111+#  SIGNED_PREFIX additions, things that are covered by the signature:
112+#    B: The "k" encoding parameter. We store this as an 8-bit character,
113+#       which is convenient because our erasure coding scheme cannot
114+#       encode if you ask for more than 255 pieces.
115+#    B: The "N" encoding parameter. Stored as an 8-bit character for the
116+#       same reasons as above.
117+#    Q: The segment size of the uploaded file. This will essentially be the
118+#       length of the file in SDMF. An unsigned long long, so we can store
119+#       files of quite large size.
120+#    Q: The data length of the uploaded file. Modulo padding, this will be
121+#       the same of the data length field. Like the data length field, it is
122+#       an unsigned long long and can be quite large.
123+#
124+#   HEADER additions:
125+#     L: The offset of the signature of this. An unsigned long.
126+#     L: The offset of the share hash chain. An unsigned long.
127+#     L: The offset of the block hash tree. An unsigned long.
128+#     L: The offset of the share data. An unsigned long.
129+#     Q: The offset of the encrypted private key. An unsigned long long, to
130+#        account for the possibility of a lot of share data.
131+#     Q: The offset of the EOF. An unsigned long long, to account for the
132+#        possibility of a lot of share data.
133+#
134+#  After all of these, we have the following:
135+#    - The verification key: Occupies the space between the end of the header
136+#      and the start of the signature (i.e.: data[HEADER_LENGTH:o['signature']].
137+#    - The signature, which goes from the signature offset to the share hash
138+#      chain offset.
139+#    - The share hash chain, which goes from the share hash chain offset to
140+#      the block hash tree offset.
141+#    - The share data, which goes from the share data offset to the encrypted
142+#      private key offset.
143+#    - The encrypted private key offset, which goes until the end of the file.
144+#
145+#  The block hash tree in this encoding has only one share, so the offset of
146+#  the share data will be 32 bits more than the offset of the block hash tree.
147+#  Given this, we may need to check to see how many bytes a reasonably sized
148+#  block hash tree will take up.
149 
150 PREFIX = ">BQ32s16s" # each version has a different prefix
151 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
152hunk ./src/allmydata/mutable/layout.py 191
153     return (share_hash_chain, block_hash_tree, share_data)
154 
155 
156-def pack_checkstring(seqnum, root_hash, IV):
157+def pack_checkstring(seqnum, root_hash, IV, version=0):
158     return struct.pack(PREFIX,
159hunk ./src/allmydata/mutable/layout.py 193
160-                       0, # version,
161+                       version,
162                        seqnum,
163                        root_hash,
164                        IV)
165hunk ./src/allmydata/mutable/layout.py 266
166                            encprivkey])
167     return final_share
168 
169+def pack_prefix(seqnum, root_hash, IV,
170+                required_shares, total_shares,
171+                segment_size, data_length):
172+    prefix = struct.pack(SIGNED_PREFIX,
173+                         0, # version,
174+                         seqnum,
175+                         root_hash,
176+                         IV,
177+                         required_shares,
178+                         total_shares,
179+                         segment_size,
180+                         data_length,
181+                         )
182+    return prefix
183+
184+
185+MDMFHEADER = ">BQ32s32sBBQQ LQQQQQQQ"
186+MDMFHEADERWITHOUTOFFSETS = ">BQ32s32sBBQQ"
187+MDMFHEADERSIZE = struct.calcsize(MDMFHEADER)
188+MDMFCHECKSTRING = ">BQ32s32s"
189+MDMFSIGNABLEHEADER = ">BQ32s32sBBQQ"
190+MDMFOFFSETS = ">LQQQQQQQ"
191+
192+class MDMFSlotWriteProxy:
193+    #implements(IMutableSlotWriter) TODO
194+
195+    """
196+    I represent a remote write slot for an MDMF mutable file.
197+
198+    I abstract away from my caller the details of block and salt
199+    management, and the implementation of the on-disk format for MDMF
200+    shares.
201+    """
202+
203+    # Expected layout, MDMF:
204+    # offset:     size:       name:
205+    #-- signed part --
206+    # 0           1           version number (01)
207+    # 1           8           sequence number
208+    # 9           32          share tree root hash
209+    # 41          32          salt tree root hash
210+    # 73          1           The "k" encoding parameter
211+    # 74          1           The "N" encoding parameter
212+    # 75          8           The segment size of the uploaded file
213+    # 83          8           The data length of the uploaded file
214+    #-- end signed part --
215+    # 91          4           The offset of the share data
216+    # 95          8           The offset of the encrypted private key
217+    # 103         8           The offset of the block hash tree
218+    # 111         8           The offset of the salt hash tree
219+    # 119         8           The offset of the signature hash chain
220+    # 127         8           The offset of the signature
221+    # 135         8           The offset of the verification key
222+    # 143         8           offset of the EOF
223+    #
224+    # followed by salts, share data, the encrypted private key, the
225+    # block hash tree, the salt hash tree, the share hash chain, a
226+    # signature over the first eight fields, and a verification key.
227+    #
228+    # The checkstring is the first four fields -- the version number,
229+    # sequence number, root hash and root salt hash. This is consistent
230+    # in meaning to what we have with SDMF files, except now instead of
231+    # using the literal salt, we use a value derived from all of the
232+    # salts.
233+    #
234+    # The ordering of the offsets is different to reflect the dependencies
235+    # that we'll run into with an MDMF file. The expected write flow is
236+    # something like this:
237+    #
238+    #   0: Initialize with the sequence number, encoding
239+    #      parameters and data length. From this, we can deduce the
240+    #      number of segments, and from that we can deduce the size of
241+    #      the AES salt field, telling us where to write AES salts, and
242+    #      where to write share data. We can also figure out where the
243+    #      encrypted private key should go, because we can figure out
244+    #      how big the share data will be.
245+    #
246+    #   1: Encrypt, encode, and upload the file in chunks. Do something
247+    #      like
248+    #
249+    #       put_block(data, segnum, salt)
250+    #
251+    #      to write a block and a salt to the disk. We can do both of
252+    #      these operations now because we have enough of the offsets to
253+    #      know where to put them.
254+    #
255+    #   2: Put the encrypted private key. Use:
256+    #
257+    #        put_encprivkey(encprivkey)
258+    #
259+    #      Now that we know the length of the private key, we can fill
260+    #      in the offset for the block hash tree.
261+    #
262+    #   3: We're now in a position to upload the block hash tree for
263+    #      a share. Put that using something like:
264+    #       
265+    #        put_blockhashes(block_hash_tree)
266+    #
267+    #      Note that block_hash_tree is a list of hashes -- we'll take
268+    #      care of the details of serializing that appropriately. When
269+    #      we get the block hash tree, we are also in a position to
270+    #      calculate the offset for the share hash chain, and fill that
271+    #      into the offsets table.
272+    #
273+    #   4: At the same time, we're in a position to upload the salt hash
274+    #      tree. This is a Merkle tree over all of the salts. We use a
275+    #      Merkle tree so that we can validate each block,salt pair as
276+    #      we download them later. We do this using
277+    #
278+    #        put_salthashes(salt_hash_tree)
279+    #
280+    #      When you do this, I automatically put the root of the tree
281+    #      (the hash at index 0 of the list) in its appropriate slot in
282+    #      the signed prefix of the share.
283+    #
284+    #   5: We're now in a position to upload the share hash chain for
285+    #      a share. Do that with something like:
286+    #     
287+    #        put_sharehashes(share_hash_chain)
288+    #
289+    #      share_hash_chain should be a dictionary mapping shnums to
290+    #      32-byte hashes -- the wrapper handles serialization.
291+    #      We'll know where to put the signature at this point, also.
292+    #      The root of this tree will be put explicitly in the next
293+    #      step.
294+    #
295+    #      TODO: Why? Why not just include it in the tree here?
296+    #
297+    #   6: Before putting the signature, we must first put the
298+    #      root_hash. Do this with:
299+    #
300+    #        put_root_hash(root_hash).
301+    #     
302+    #      In terms of knowing where to put this value, it was always
303+    #      possible to place it, but it makes sense semantically to
304+    #      place it after the share hash tree, so that's why you do it
305+    #      in this order.
306+    #
307+    #   6: With the root hash put, we can now sign the header. Use:
308+    #
309+    #        get_signable()
310+    #
311+    #      to get the part of the header that you want to sign, and use:
312+    #       
313+    #        put_signature(signature)
314+    #
315+    #      to write your signature to the remote server.
316+    #
317+    #   6: Add the verification key, and finish. Do:
318+    #
319+    #        put_verification_key(key)
320+    #
321+    #      and
322+    #
323+    #        finish_publish()
324+    #
325+    # Checkstring management:
326+    #
327+    # To write to a mutable slot, we have to provide test vectors to ensure
328+    # that we are writing to the same data that we think we are. These
329+    # vectors allow us to detect uncoordinated writes; that is, writes
330+    # where both we and some other shareholder are writing to the
331+    # mutable slot, and to report those back to the parts of the program
332+    # doing the writing.
333+    #
334+    # With SDMF, this was easy -- all of the share data was written in
335+    # one go, so it was easy to detect uncoordinated writes, and we only
336+    # had to do it once. With MDMF, not all of the file is written at
337+    # once.
338+    #
339+    # If a share is new, we write out as much of the header as we can
340+    # before writing out anything else. This gives other writers a
341+    # canary that they can use to detect uncoordinated writes, and, if
342+    # they do the same thing, gives us the same canary. We them update
343+    # the share. We won't be able to write out two fields of the header
344+    # -- the share tree hash and the salt hash -- until we finish
345+    # writing out the share. We only require the writer to provide the
346+    # initial checkstring, and keep track of what it should be after
347+    # updates ourselves.
348+    #
349+    # If we haven't written anything yet, then on the first write (which
350+    # will probably be a block + salt of a share), we'll also write out
351+    # the header. On subsequent passes, we'll expect to see the header.
352+    # This changes in two places:
353+    #
354+    #   - When we write out the salt hash
355+    #   - When we write out the root of the share hash tree
356+    #
357+    # since these values will change the header. It is possible that we
358+    # can just make those be written in one operation to minimize
359+    # disruption.
360+    def __init__(self,
361+                 shnum,
362+                 rref, # a remote reference to a storage server
363+                 storage_index,
364+                 secrets, # (write_enabler, renew_secret, cancel_secret)
365+                 seqnum, # the sequence number of the mutable file
366+                 required_shares,
367+                 total_shares,
368+                 segment_size,
369+                 data_length): # the length of the original file
370+        self._shnum = shnum
371+        self._rref = rref
372+        self._storage_index = storage_index
373+        self._seqnum = seqnum
374+        self._required_shares = required_shares
375+        assert self._shnum >= 0 and self._shnum < total_shares
376+        self._total_shares = total_shares
377+        # We build up the offset table as we write things. It is the
378+        # last thing we write to the remote server.
379+        self._offsets = {}
380+        self._testvs = []
381+        self._secrets = secrets
382+        # The segment size needs to be a multiple of the k parameter --
383+        # any padding should have been carried out by the publisher
384+        # already.
385+        assert segment_size % required_shares == 0
386+        self._segment_size = segment_size
387+        self._data_length = data_length
388+
389+        # These are set later -- we define them here so that we can
390+        # check for their existence easily
391+
392+        # This is the root of the share hash tree -- the Merkle tree
393+        # over the roots of the block hash trees computed for shares in
394+        # this upload.
395+        self._root_hash = None
396+        # This is the root of the salt hash tree -- the Merkle tree over
397+        # the hashes of the salts used for each segment of the file.
398+        self._salt_hash = None
399+
400+        # We haven't yet written anything to the remote bucket. By
401+        # setting this, we tell the _write method as much. The write
402+        # method will then know that it also needs to add a write vector
403+        # for the checkstring (or what we have of it) to the first write
404+        # request. We'll then record that value for future use.  If
405+        # we're expecting something to be there already, we need to call
406+        # set_checkstring before we write anything to tell the first
407+        # write about that.
408+        self._written = False
409+
410+        # When writing data to the storage servers, we get a read vector
411+        # for free. We'll read the checkstring, which will help us
412+        # figure out what's gone wrong if a write fails.
413+        self._readv = [(0, struct.calcsize(MDMFCHECKSTRING))]
414+
415+        # We calculate the number of segments because it tells us
416+        # where the salt part of the file ends/share segment begins,
417+        # and also because it provides a useful amount of bounds checking.
418+        self._num_segments = mathutil.div_ceil(self._data_length,
419+                                               self._segment_size)
420+        self._block_size = self._segment_size / self._required_shares
421+        # We also calculate the share size, to help us with block
422+        # constraints later.
423+        tail_size = self._data_length % self._segment_size
424+        if not tail_size:
425+            self._tail_block_size = self._block_size
426+        else:
427+            self._tail_block_size = mathutil.next_multiple(tail_size,
428+                                                           self._required_shares)
429+            self._tail_block_size /= self._required_shares
430+
431+        # We already know where the AES salts start; right after the end
432+        # of the header (which is defined as the signable part + the offsets)
433+        # We need to calculate where the share data starts, since we're
434+        # responsible (after this method) for being able to write it.
435+        self._offsets['share-data'] = MDMFHEADERSIZE
436+        self._offsets['share-data'] += self._num_segments * SALT_SIZE
437+        # We can also calculate where the encrypted private key begins
438+        # from what we know know.
439+        self._offsets['enc_privkey'] = self._offsets['share-data']
440+        self._offsets['enc_privkey'] += self._block_size * (self._num_segments - 1)
441+        self._offsets['enc_privkey'] += self._tail_block_size
442+        # We'll wait for the rest. Callers can now call my "put_block" and
443+        # "set_checkstring" methods.
444+
445+
446+    def set_checkstring(self, checkstring):
447+        """
448+        Set checkstring checkstring for the given shnum.
449+
450+        By default, I assume that I am writing new shares to the grid.
451+        If you don't explcitly set your own checkstring, I will use
452+        one that requires that the remote share not exist. You will want
453+        to use this method if you are updating a share in-place;
454+        otherwise, writes will fail.
455+        """
456+        # You're allowed to overwrite checkstrings with this method;
457+        # I assume that users know what they are doing when they call
458+        # it.
459+        if checkstring == "":
460+            # We special-case this, since len("") = 0, but we need
461+            # length of 1 for the case of an empty share to work on the
462+            # storage server, which is what a checkstring that is the
463+            # empty string means.
464+            self._testvs = []
465+        else:
466+            self._testvs = []
467+            self._testvs.append((0, len(checkstring), "eq", checkstring))
468+
469+
470+    def __repr__(self):
471+        return "MDMFSlotWriteProxy for share %d" % self._shnum
472+
473+
474+    def get_checkstring(self):
475+        """
476+        Given a share number, I return a representation of what the
477+        checkstring for that share on the server will look like.
478+        """
479+        if self._root_hash:
480+            roothash = self._root_hash
481+        else:
482+            roothash = "\x00" * 32
483+        # self._salt_hash and self._root_hash means that we've written
484+        # both of these things to the server. self._salt_hash will be
485+        # set first, though, and if self._root_hash isn't also set then
486+        # neither of them are written to the server, so we need to leave
487+        # them alone.
488+        if self._salt_hash and self._root_hash:
489+            salthash = self._salt_hash
490+        else:
491+            salthash = "\x00" * 32
492+        checkstring = struct.pack(MDMFCHECKSTRING,
493+                                  1,
494+                                  self._seqnum,
495+                                  roothash,
496+                                  salthash)
497+        return checkstring
498+
499+
500+    def put_block(self, data, segnum, salt):
501+        """
502+        Put the encrypted-and-encoded data segment in the slot, along
503+        with the salt.
504+        """
505+        if segnum >= self._num_segments:
506+            raise LayoutInvalid("I won't overwrite the private key")
507+        if len(salt) != SALT_SIZE:
508+            raise LayoutInvalid("I was given a salt of size %d, but "
509+                                "I wanted a salt of size %d")
510+        if segnum + 1 == self._num_segments:
511+            if len(data) != self._tail_block_size:
512+                raise LayoutInvalid("I was given the wrong size block to write")
513+        elif len(data) != self._block_size:
514+            raise LayoutInvalid("I was given the wrong size block to write")
515+
516+        # We want to write at offsets['share-data'] + segnum * block_size.
517+        assert self._offsets
518+        assert self._offsets['share-data']
519+
520+        offset = self._offsets['share-data'] + segnum * self._block_size
521+        datavs = [tuple([offset, data])]
522+        # We also have to write the salt. This is at:
523+        salt_offset = MDMFHEADERSIZE + SALT_SIZE * segnum
524+        datavs.append(tuple([salt_offset, salt]))
525+        return self._write(datavs)
526+
527+
528+    def put_encprivkey(self, encprivkey):
529+        """
530+        Put the encrypted private key in the remote slot.
531+        """
532+        assert self._offsets
533+        assert self._offsets['enc_privkey']
534+        # You shouldn't re-write the encprivkey after the block hash
535+        # tree is written, since that could cause the private key to run
536+        # into the block hash tree. Before it writes the block hash
537+        # tree, the block hash tree writing method writes the offset of
538+        # the salt hash tree. So that's a good indicator of whether or
539+        # not the block hash tree has been written.
540+        if "salt_hash_tree" in self._offsets:
541+            raise LayoutInvalid("You must write this before the block hash tree")
542+
543+        self._offsets['block_hash_tree'] = self._offsets['enc_privkey'] + len(encprivkey)
544+        datavs = [(tuple([self._offsets['enc_privkey'], encprivkey]))]
545+        def _on_failure():
546+            del(self._offsets['block_hash_tree'])
547+        return self._write(datavs, on_failure=_on_failure)
548+
549+
550+    def put_blockhashes(self, blockhashes):
551+        """
552+        Put the block hash tree in the remote slot.
553+
554+        The encrypted private key must be put before the block hash
555+        tree, since we need to know how large it is to know where the
556+        block hash tree should go. The block hash tree must be put
557+        before the salt hash tree, since its size determines the
558+        offset of the share hash chain.
559+        """
560+        assert self._offsets
561+        assert isinstance(blockhashes, list)
562+        if "block_hash_tree" not in self._offsets:
563+            raise LayoutInvalid("You must put the encrypted private key "
564+                                "before you put the block hash tree")
565+        # If written, the share hash chain causes the signature offset
566+        # to be defined.
567+        if "share_hash_chain" in self._offsets:
568+            raise LayoutInvalid("You must put the block hash tree before "
569+                                "you put the salt hash tree")
570+        blockhashes_s = "".join(blockhashes)
571+        self._offsets['salt_hash_tree'] = self._offsets['block_hash_tree'] + len(blockhashes_s)
572+        datavs = []
573+        datavs.append(tuple([self._offsets['block_hash_tree'], blockhashes_s]))
574+        def _on_failure():
575+            del(self._offsets['salt_hash_tree'])
576+        return self._write(datavs, on_failure=_on_failure)
577+
578+
579+    def put_salthashes(self, salthashes):
580+        """
581+        Put the salt hash tree in the remote slot.
582+
583+        The block hash tree must be put before the salt hash tree, since
584+        its size tells us where we need to put the salt hash tree. This
585+        method must be called before the share hash chain can be
586+        uploaded, since the size of the salt hash tree tells us where
587+        the share hash chain can go
588+        """
589+        assert self._offsets
590+        assert isinstance(salthashes, list)
591+        if "salt_hash_tree" not in self._offsets:
592+            raise LayoutInvalid("You must put the block hash tree "
593+                                "before putting the salt hash tree")
594+        if "signature" in self._offsets:
595+            raise LayoutInvalid("You must put the salt hash tree "
596+                                "before you put the share hash chain")
597+        # The root of the salt hash tree is at index 0. We'll write this when
598+        # we put the root hash later; we just keep track of it for now.
599+        self._salt_hash = salthashes[0]
600+        salthashes_s = "".join(salthashes[1:])
601+        self._offsets['share_hash_chain'] = self._offsets['salt_hash_tree'] + len(salthashes_s)
602+        datavs = []
603+        datavs.append(tuple([self._offsets['salt_hash_tree'], salthashes_s]))
604+        def _on_failure():
605+            del(self._offsets['share_hash_chain'])
606+        return self._write(datavs, on_failure=_on_failure)
607+
608+
609+    def put_sharehashes(self, sharehashes):
610+        """
611+        Put the share hash chain in the remote slot.
612+
613+        The salt hash tree must be put before the share hash chain,
614+        since we need to know where the salt hash tree ends before we
615+        can know where the share hash chain starts. The share hash chain
616+        must be put before the signature, since the length of the packed
617+        share hash chain determines the offset of the signature. Also,
618+        semantically, you must know what the root of the salt hash tree
619+        is before you can generate a valid signature.
620+        """
621+        assert isinstance(sharehashes, dict)
622+        if "share_hash_chain" not in self._offsets:
623+            raise LayoutInvalid("You need to put the salt hash tree before "
624+                                "you can put the share hash chain")
625+        # The signature comes after the share hash chain. If the
626+        # signature has already been written, we must not write another
627+        # share hash chain. The signature writes the verification key
628+        # offset when it gets sent to the remote server, so we look for
629+        # that.
630+        if "verification_key" in self._offsets:
631+            raise LayoutInvalid("You must write the share hash chain "
632+                                "before you write the signature")
633+        datavs = []
634+        sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i])
635+                                  for i in sorted(sharehashes.keys())])
636+        self._offsets['signature'] = self._offsets['share_hash_chain'] + len(sharehashes_s)
637+        datavs.append(tuple([self._offsets['share_hash_chain'], sharehashes_s]))
638+        def _on_failure():
639+            del(self._offsets['signature'])
640+        return self._write(datavs, on_failure=_on_failure)
641+
642+
643+    def put_root_hash(self, roothash):
644+        """
645+        Put the root hash (the root of the share hash tree) in the
646+        remote slot.
647+        """
648+        # It does not make sense to be able to put the root
649+        # hash without first putting the share hashes, since you need
650+        # the share hashes to generate the root hash.
651+        #
652+        # Signature is defined by the routine that places the share hash
653+        # chain, so it's a good thing to look for in finding out whether
654+        # or not the share hash chain exists on the remote server.
655+        if "signature" not in self._offsets:
656+            raise LayoutInvalid("You need to put the share hash chain "
657+                                "before you can put the root share hash")
658+        if len(roothash) != HASH_SIZE:
659+            raise LayoutInvalid("hashes and salts must be exactly %d bytes"
660+                                 % HASH_SIZE)
661+        datavs = []
662+        self._root_hash = roothash
663+        # To write both of these values, we update the checkstring on
664+        # the remote server, which includes them
665+        checkstring = self.get_checkstring()
666+        datavs.append(tuple([0, checkstring]))
667+        # This write, if successful, changes the checkstring, so we need
668+        # to update our internal checkstring to be consistent with the
669+        # one on the server.
670+        def _on_success():
671+            self._testvs = [(0, len(checkstring), "eq", checkstring)]
672+        def _on_failure():
673+            self._root_hash = None
674+            self._salt_hash = None
675+        return self._write(datavs,
676+                           on_success=_on_success,
677+                           on_failure=_on_failure)
678+
679+
680+    def get_signable(self):
681+        """
682+        Get the first eight fields of the mutable file; the parts that
683+        are signed.
684+        """
685+        if not self._root_hash or not self._salt_hash:
686+            raise LayoutInvalid("You need to set the root hash and the "
687+                                "salt hash before getting something to "
688+                                "sign")
689+        return struct.pack(MDMFSIGNABLEHEADER,
690+                           1,
691+                           self._seqnum,
692+                           self._root_hash,
693+                           self._salt_hash,
694+                           self._required_shares,
695+                           self._total_shares,
696+                           self._segment_size,
697+                           self._data_length)
698+
699+
700+    def put_signature(self, signature):
701+        """
702+        Put the signature field to the remote slot.
703+
704+        I require that the root hash and share hash chain have been put
705+        to the grid before I will write the signature to the grid.
706+        """
707+        if "signature" not in self._offsets:
708+            raise LayoutInvalid("You must put the share hash chain "
709+        # It does not make sense to put a signature without first
710+        # putting the root hash and the salt hash (since otherwise
711+        # the signature would be incomplete), so we don't allow that.
712+                       "before putting the signature")
713+        if not self._root_hash:
714+            raise LayoutInvalid("You must complete the signed prefix "
715+                                "before computing a signature")
716+        # If we put the signature after we put the verification key, we
717+        # could end up running into the verification key, and will
718+        # probably screw up the offsets as well. So we don't allow that.
719+        # The method that writes the verification key defines the EOF
720+        # offset before writing the verification key, so look for that.
721+        if "EOF" in self._offsets:
722+            raise LayoutInvalid("You must write the signature before the verification key")
723+
724+        self._offsets['verification_key'] = self._offsets['signature'] + len(signature)
725+        datavs = []
726+        datavs.append(tuple([self._offsets['signature'], signature]))
727+        def _on_failure():
728+            del(self._offsets['verification_key'])
729+        return self._write(datavs, on_failure=_on_failure)
730+
731+
732+    def put_verification_key(self, verification_key):
733+        """
734+        Put the verification key into the remote slot.
735+
736+        I require that the signature have been written to the storage
737+        server before I allow the verification key to be written to the
738+        remote server.
739+        """
740+        if "verification_key" not in self._offsets:
741+            raise LayoutInvalid("You must put the signature before you "
742+                                "can put the verification key")
743+        self._offsets['EOF'] = self._offsets['verification_key'] + len(verification_key)
744+        datavs = []
745+        datavs.append(tuple([self._offsets['verification_key'], verification_key]))
746+        def _on_failure():
747+            del(self._offsets['EOF'])
748+        return self._write(datavs, on_failure=_on_failure)
749+
750+
751+    def finish_publishing(self):
752+        """
753+        Write the offset table and encoding parameters to the remote
754+        slot, since that's the only thing we have yet to publish at this
755+        point.
756+        """
757+        if "EOF" not in self._offsets:
758+            raise LayoutInvalid("You must put the verification key before "
759+                                "you can publish the offsets")
760+        offsets_offset = struct.calcsize(MDMFHEADERWITHOUTOFFSETS)
761+        offsets = struct.pack(MDMFOFFSETS,
762+                              self._offsets['share-data'],
763+                              self._offsets['enc_privkey'],
764+                              self._offsets['block_hash_tree'],
765+                              self._offsets['salt_hash_tree'],
766+                              self._offsets['share_hash_chain'],
767+                              self._offsets['signature'],
768+                              self._offsets['verification_key'],
769+                              self._offsets['EOF'])
770+        datavs = []
771+        datavs.append(tuple([offsets_offset, offsets]))
772+        encoding_parameters_offset = struct.calcsize(MDMFCHECKSTRING)
773+        params = struct.pack(">BBQQ",
774+                             self._required_shares,
775+                             self._total_shares,
776+                             self._segment_size,
777+                             self._data_length)
778+        datavs.append(tuple([encoding_parameters_offset, params]))
779+        return self._write(datavs)
780+
781+
782+    def _write(self, datavs, on_failure=None, on_success=None):
783+        """I write the data vectors in datavs to the remote slot."""
784+        tw_vectors = {}
785+        new_share = False
786+        if not self._testvs:
787+            self._testvs = []
788+            self._testvs.append(tuple([0, 1, "eq", ""]))
789+            new_share = True
790+        if not self._written:
791+            # Write a new checkstring to the share when we write it, so
792+            # that we have something to check later.
793+            new_checkstring = self.get_checkstring()
794+            datavs.append((0, new_checkstring))
795+            def _first_write():
796+                self._written = True
797+                self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)]
798+            on_success = _first_write
799+        tw_vectors[self._shnum] = (self._testvs, datavs, None)
800+        datalength = sum([len(x[1]) for x in datavs])
801+        d = self._rref.callRemote("slot_testv_and_readv_and_writev",
802+                                  self._storage_index,
803+                                  self._secrets,
804+                                  tw_vectors,
805+                                  self._readv)
806+        def _result(results):
807+            if isinstance(results, failure.Failure) or not results[0]:
808+                # Do nothing; the write was unsuccessful.
809+                if on_failure: on_failure()
810+            else:
811+                if on_success: on_success()
812+            return results
813+        d.addCallback(_result)
814+        return d
815+
816+
817+class MDMFSlotReadProxy:
818+    """
819+    I read from a mutable slot filled with data written in the MDMF data
820+    format (which is described above).
821+
822+    I can be initialized with some amount of data, which I will use (if
823+    it is valid) to eliminate some of the need to fetch it from servers.
824+    """
825+    def __init__(self,
826+                 rref,
827+                 storage_index,
828+                 shnum,
829+                 data=""):
830+        # Start the initialization process.
831+        self._rref = rref
832+        self._storage_index = storage_index
833+        self.shnum = shnum
834+
835+        # Before doing anything, the reader is probably going to want to
836+        # verify that the signature is correct. To do that, they'll need
837+        # the verification key, and the signature. To get those, we'll
838+        # need the offset table. So fetch the offset table on the
839+        # assumption that that will be the first thing that a reader is
840+        # going to do.
841+
842+        # The fact that these encoding parameters are None tells us
843+        # that we haven't yet fetched them from the remote share, so we
844+        # should. We could just not set them, but the checks will be
845+        # easier to read if we don't have to use hasattr.
846+        self._version_number = None
847+        self._sequence_number = None
848+        self._root_hash = None
849+        self._salt_hash = None
850+        self._salt = None
851+        self._required_shares = None
852+        self._total_shares = None
853+        self._segment_size = None
854+        self._data_length = None
855+        self._offsets = None
856+
857+        # If the user has chosen to initialize us with some data, we'll
858+        # try to satisfy subsequent data requests with that data before
859+        # asking the storage server for it. If
860+        self._data = data
861+        # The way callers interact with cache in the filenode returns
862+        # None if there isn't any cached data, but the way we index the
863+        # cached data requires a string, so convert None to "".
864+        if self._data == None:
865+            self._data = ""
866+
867+        self._queue_observers = observer.ObserverList()
868+        self._readvs = []
869+
870+
871+    def _maybe_fetch_offsets_and_header(self, force_remote=False):
872+        """
873+        I fetch the offset table and the header from the remote slot if
874+        I don't already have them. If I do have them, I do nothing and
875+        return an empty Deferred.
876+        """
877+        if self._offsets:
878+            return defer.succeed(None)
879+        # At this point, we may be either SDMF or MDMF. Fetching 91
880+        # bytes will be enough to get information for both SDMF and
881+        # MDMF, though we'll be left with about 20 more bytes than we
882+        # need if this ends up being SDMF. We could just fetch the first
883+        # byte, which would save the extra bytes at the cost of an
884+        # additional roundtrip after we parse the result.
885+        readvs = [(0, 91)]
886+        d = self._read(readvs, force_remote)
887+        d.addCallback(self._process_encoding_parameters)
888+
889+        # Now, we have the encoding parameters, which will tell us
890+        # where we need to look for the offset table.
891+        def _fetch_offsets(ignored):
892+            if self._version_number == 0:
893+                # In SDMF, the offset table starts at byte 75, and
894+                # extends for 32 bytes
895+                readv = [(75, 32)] # struct.calcsize(">LLLLQQ") == 32
896+
897+            elif self._version_number == 1:
898+                # In MDMF, the offset table starts at byte 91 and
899+                # extends for 60 bytes
900+                readv = [(91, 60)] # struct.calcsize(">LQQQQQQQ") == 60
901+            else:
902+                raise LayoutInvalid("I only understand SDMF and MDMF")
903+            return readv
904+
905+        d.addCallback(_fetch_offsets)
906+        d.addCallback(lambda readv:
907+            self._read(readv, force_remote))
908+        d.addCallback(self._process_offsets)
909+        return d
910+
911+
912+    def _process_encoding_parameters(self, encoding_parameters):
913+        assert self.shnum in encoding_parameters
914+        encoding_parameters = encoding_parameters[self.shnum][0]
915+        # The first byte is the version number. It will tell us what
916+        # to do next.
917+        (verno,) = struct.unpack(">B", encoding_parameters[:1])
918+        if verno == MDMF_VERSION:
919+            (verno,
920+             seqnum,
921+             root_hash,
922+             salt_hash,
923+             k,
924+             n,
925+             segsize,
926+             datalen) = struct.unpack(MDMFHEADERWITHOUTOFFSETS,
927+                                      encoding_parameters)
928+            self._salt_hash = salt_hash
929+            if segsize == 0 and datalen == 0:
930+                # Empty file, no segments.
931+                self._num_segments = 0
932+            else:
933+                self._num_segments = mathutil.div_ceil(datalen, segsize)
934+
935+        elif verno == SDMF_VERSION:
936+            (verno,
937+             seqnum,
938+             root_hash,
939+             salt,
940+             k,
941+             n,
942+             segsize,
943+             datalen) = struct.unpack(">BQ32s16s BBQQ",
944+                                      encoding_parameters[:75])
945+            self._salt = salt
946+            if segsize == 0 and datalen == 0:
947+                # empty file
948+                self._num_segments = 0
949+            else:
950+                # non-empty SDMF files have one segment.
951+                self._num_segments = 1
952+        else:
953+            raise UnknownVersionError("You asked me to read mutable file "
954+                                      "version %d, but I only understand "
955+                                      "%d and %d" % (verno, SDMF_VERSION,
956+                                                     MDMF_VERSION))
957+
958+        self._version_number = verno
959+        self._sequence_number = seqnum
960+        self._root_hash = root_hash
961+        self._required_shares = k
962+        self._total_shares = n
963+        self._segment_size = segsize
964+        self._data_length = datalen
965+
966+        self._block_size = self._segment_size / self._required_shares
967+        # We can upload empty files, and need to account for this fact
968+        # so as to avoid zero-division and zero-modulo errors.
969+        if datalen > 0:
970+            tail_size = self._data_length % self._segment_size
971+        else:
972+            tail_size = 0
973+        if not tail_size:
974+            self._tail_block_size = self._block_size
975+        else:
976+            self._tail_block_size = mathutil.next_multiple(tail_size,
977+                                                    self._required_shares)
978+            self._tail_block_size /= self._required_shares
979+
980+
981+    def _process_offsets(self, offsets):
982+        assert self.shnum in offsets
983+        offsets = offsets[self.shnum][0]
984+        if self._version_number == 0:
985+            (signature,
986+             share_hash_chain,
987+             block_hash_tree,
988+             share_data,
989+             enc_privkey,
990+             EOF) = struct.unpack(">LLLLQQ", offsets)
991+            self._offsets = {}
992+            self._offsets['signature'] = signature
993+            self._offsets['share_data'] = share_data
994+            self._offsets['block_hash_tree'] = block_hash_tree
995+            self._offsets['share_hash_chain'] = share_hash_chain
996+            self._offsets['enc_privkey'] = enc_privkey
997+            self._offsets['EOF'] = EOF
998+        elif self._version_number == 1:
999+            (share_data,
1000+             encprivkey,
1001+             blockhashes,
1002+             salthashes,
1003+             sharehashes,
1004+             signature,
1005+             verification_key,
1006+             eof) = struct.unpack(MDMFOFFSETS, offsets)
1007+            self._offsets = {}
1008+            self._offsets['share_data'] = share_data
1009+            self._offsets['enc_privkey'] = encprivkey
1010+            self._offsets['block_hash_tree'] = blockhashes
1011+            self._offsets['salt_hash_tree']= salthashes
1012+            self._offsets['share_hash_chain'] = sharehashes
1013+            self._offsets['signature'] = signature
1014+            self._offsets['verification_key'] = verification_key
1015+            self._offsets['EOF'] = eof
1016+
1017+
1018+    def get_block_and_salt(self, segnum, queue=False):
1019+        """
1020+        I return (block, salt), where block is the block data and
1021+        salt is the salt used to encrypt that segment.
1022+        """
1023+        d = self._maybe_fetch_offsets_and_header()
1024+        def _then(ignored):
1025+            base_share_offset = self._offsets['share_data']
1026+            if self._version_number == 1:
1027+                base_salt_offset = struct.calcsize(MDMFHEADER)
1028+                salt_offset = base_salt_offset + SALT_SIZE * segnum
1029+            else:
1030+                salt_offset = None # no per-segment salts in SDMF
1031+            return base_share_offset, salt_offset
1032+
1033+        d.addCallback(_then)
1034+
1035+        def _calculate_share_offset(share_and_salt_offset):
1036+            base_share_offset, salt_offset = share_and_salt_offset
1037+            if segnum + 1 > self._num_segments:
1038+                raise LayoutInvalid("Not a valid segment number")
1039+
1040+            share_offset = base_share_offset + self._block_size * segnum
1041+            if segnum + 1 == self._num_segments:
1042+                data = self._tail_block_size
1043+            else:
1044+                data = self._block_size
1045+            readvs = [(share_offset, data)]
1046+            if salt_offset:
1047+                readvs.insert(0,(salt_offset, SALT_SIZE))
1048+            return readvs
1049+
1050+        d.addCallback(_calculate_share_offset)
1051+        d.addCallback(lambda readvs:
1052+            self._read(readvs, queue=queue))
1053+        def _process_results(results):
1054+            assert self.shnum in results
1055+            if self._version_number == 0:
1056+                # We only read the share data, but we know the salt from
1057+                # when we fetched the header
1058+                data = results[self.shnum]
1059+                if not data:
1060+                    data = ""
1061+                else:
1062+                    assert len(data) == 1
1063+                    data = data[0]
1064+                salt = self._salt
1065+            else:
1066+                data = results[self.shnum]
1067+                if not data:
1068+                    salt = data = ""
1069+                else:
1070+                    assert len(data) == 2
1071+                    salt, data = results[self.shnum]
1072+            return data, salt
1073+        d.addCallback(_process_results)
1074+        return d
1075+
1076+
1077+    def get_blockhashes(self, needed=None, queue=False, force_remote=False):
1078+        """
1079+        I return the block hash tree
1080+
1081+        I take an optional argument, needed, which is a set of indices
1082+        correspond to hashes that I should fetch. If this argument is
1083+        missing, I will fetch the entire block hash tree; otherwise, I
1084+        may attempt to fetch fewer hashes, based on what needed says
1085+        that I should do. Note that I may fetch as many hashes as I
1086+        want, so long as the set of hashes that I do fetch is a superset
1087+        of the ones that I am asked for, so callers should be prepared
1088+        to tolerate additional hashes.
1089+        """
1090+        # TODO: Return only the parts of the block hash tree necessary
1091+        # to validate the blocknum provided?
1092+        # This is a good idea, but it is hard to implement correctly. It
1093+        # is bad to fetch any one block hash more than once, so we
1094+        # probably just want to fetch the whole thing at once and then
1095+        # serve it.
1096+        if needed == set([]):
1097+            return defer.succeed([])
1098+        d = self._maybe_fetch_offsets_and_header()
1099+        def _then(ignored):
1100+            blockhashes_offset = self._offsets['block_hash_tree']
1101+            if self._version_number == 1:
1102+                blockhashes_length = self._offsets['salt_hash_tree'] - blockhashes_offset
1103+            else:
1104+                blockhashes_length = self._offsets['share_data'] - blockhashes_offset
1105+            readvs = [(blockhashes_offset, blockhashes_length)]
1106+            return readvs
1107+        d.addCallback(_then)
1108+        d.addCallback(lambda readvs:
1109+            self._read(readvs, queue=queue, force_remote=force_remote))
1110+        def _build_block_hash_tree(results):
1111+            assert self.shnum in results
1112+
1113+            rawhashes = results[self.shnum][0]
1114+            results = [rawhashes[i:i+HASH_SIZE]
1115+                       for i in range(0, len(rawhashes), HASH_SIZE)]
1116+            return results
1117+        d.addCallback(_build_block_hash_tree)
1118+        return d
1119+
1120+
1121+    def get_salthashes(self, needed=None, queue=False):
1122+        """
1123+        I return the salt hash tree.
1124+
1125+        I accept an optional argument, needed, which is a set of indices
1126+        corresponding to hashes that I should fetch. If this argument is
1127+        missing, I will fetch and return the entire salt hash tree.
1128+        Otherwise, I may fetch any part of the salt hash tree, so long
1129+        as the part that I fetch and return is a superset of the part
1130+        that my caller has asked for. Callers should be prepared to
1131+        tolerate this behavior.
1132+
1133+        This method is only meaningful for MDMF files, as only MDMF
1134+        files have a salt hash tree. If the remote file is an SDMF file,
1135+        this method will return False.
1136+        """
1137+        # TODO: Only get the leaves nodes implied by salthashes
1138+        if needed == set([]):
1139+            return defer.succeed([])
1140+        d = self._maybe_fetch_offsets_and_header()
1141+        def _then(ignored):
1142+            if self._version_number == 0:
1143+                return []
1144+            else:
1145+                salthashes_offset = self._offsets['salt_hash_tree']
1146+                salthashes_length = self._offsets['share_hash_chain'] - salthashes_offset
1147+                return [(salthashes_offset, salthashes_length)]
1148+        d.addCallback(_then)
1149+        def _maybe_read(readvs):
1150+            if readvs:
1151+                return self._read(readvs, queue=queue)
1152+            else:
1153+                return False
1154+        d.addCallback(_maybe_read)
1155+        def _process_results(results):
1156+            if not results:
1157+                return False
1158+            assert self.shnum in results
1159+
1160+            rawhashes = results[self.shnum][0]
1161+            results = [rawhashes[i:i+HASH_SIZE]
1162+                       for i in range(0, len(rawhashes), HASH_SIZE)]
1163+            return results
1164+        d.addCallback(_process_results)
1165+        return d
1166+
1167+
1168+    def get_sharehashes(self, needed=None, queue=False, force_remote=False):
1169+        """
1170+        I return the part of the share hash chain placed to validate
1171+        this share.
1172+
1173+        I take an optional argument, needed. Needed is a set of indices
1174+        that correspond to the hashes that I should fetch. If needed is
1175+        not present, I will fetch and return the entire share hash
1176+        chain. Otherwise, I may fetch and return any part of the share
1177+        hash chain that is a superset of the part that I am asked to
1178+        fetch. Callers should be prepared to deal with more hashes than
1179+        they've asked for.
1180+        """
1181+        if needed == set([]):
1182+            return defer.succeed([])
1183+        d = self._maybe_fetch_offsets_and_header()
1184+
1185+        def _make_readvs(ignored):
1186+            sharehashes_offset = self._offsets['share_hash_chain']
1187+            if self._version_number == 0:
1188+                sharehashes_length = self._offsets['block_hash_tree'] - sharehashes_offset
1189+            else:
1190+                sharehashes_length = self._offsets['signature'] - sharehashes_offset
1191+            readvs = [(sharehashes_offset, sharehashes_length)]
1192+            return readvs
1193+        d.addCallback(_make_readvs)
1194+        d.addCallback(lambda readvs:
1195+            self._read(readvs, queue=queue, force_remote=force_remote))
1196+        def _build_share_hash_chain(results):
1197+            assert self.shnum in results
1198+
1199+            sharehashes = results[self.shnum][0]
1200+            results = [sharehashes[i:i+(HASH_SIZE + 2)]
1201+                       for i in range(0, len(sharehashes), HASH_SIZE + 2)]
1202+            results = dict([struct.unpack(">H32s", data)
1203+                            for data in results])
1204+            return results
1205+        d.addCallback(_build_share_hash_chain)
1206+        return d
1207+
1208+
1209+    def get_encprivkey(self, queue=False):
1210+        """
1211+        I return the encrypted private key.
1212+        """
1213+        d = self._maybe_fetch_offsets_and_header()
1214+
1215+        def _make_readvs(ignored):
1216+            privkey_offset = self._offsets['enc_privkey']
1217+            if self._version_number == 0:
1218+                privkey_length = self._offsets['EOF'] - privkey_offset
1219+            else:
1220+                privkey_length = self._offsets['block_hash_tree'] - privkey_offset
1221+            readvs = [(privkey_offset, privkey_length)]
1222+            return readvs
1223+        d.addCallback(_make_readvs)
1224+        d.addCallback(lambda readvs:
1225+            self._read(readvs, queue=queue))
1226+        def _process_results(results):
1227+            assert self.shnum in results
1228+            privkey = results[self.shnum][0]
1229+            return privkey
1230+        d.addCallback(_process_results)
1231+        return d
1232+
1233+
1234+    def get_signature(self, queue=False):
1235+        """
1236+        I return the signature of my share.
1237+        """
1238+        d = self._maybe_fetch_offsets_and_header()
1239+
1240+        def _make_readvs(ignored):
1241+            signature_offset = self._offsets['signature']
1242+            if self._version_number == 1:
1243+                signature_length = self._offsets['verification_key'] - signature_offset
1244+            else:
1245+                signature_length = self._offsets['share_hash_chain'] - signature_offset
1246+            readvs = [(signature_offset, signature_length)]
1247+            return readvs
1248+        d.addCallback(_make_readvs)
1249+        d.addCallback(lambda readvs:
1250+            self._read(readvs, queue=queue))
1251+        def _process_results(results):
1252+            assert self.shnum in results
1253+            signature = results[self.shnum][0]
1254+            return signature
1255+        d.addCallback(_process_results)
1256+        return d
1257+
1258+
1259+    def get_verification_key(self, queue=False):
1260+        """
1261+        I return the verification key.
1262+        """
1263+        d = self._maybe_fetch_offsets_and_header()
1264+
1265+        def _make_readvs(ignored):
1266+            if self._version_number == 1:
1267+                vk_offset = self._offsets['verification_key']
1268+                vk_length = self._offsets['EOF'] - vk_offset
1269+            else:
1270+                vk_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1271+                vk_length = self._offsets['signature'] - vk_offset
1272+            readvs = [(vk_offset, vk_length)]
1273+            return readvs
1274+        d.addCallback(_make_readvs)
1275+        d.addCallback(lambda readvs:
1276+            self._read(readvs, queue=queue))
1277+        def _process_results(results):
1278+            assert self.shnum in results
1279+            verification_key = results[self.shnum][0]
1280+            return verification_key
1281+        d.addCallback(_process_results)
1282+        return d
1283+
1284+
1285+    def get_encoding_parameters(self):
1286+        """
1287+        I return (k, n, segsize, datalen)
1288+        """
1289+        d = self._maybe_fetch_offsets_and_header()
1290+        d.addCallback(lambda ignored:
1291+            (self._required_shares,
1292+             self._total_shares,
1293+             self._segment_size,
1294+             self._data_length))
1295+        return d
1296+
1297+
1298+    def get_seqnum(self):
1299+        """
1300+        I return the sequence number for this share.
1301+        """
1302+        d = self._maybe_fetch_offsets_and_header()
1303+        d.addCallback(lambda ignored:
1304+            self._sequence_number)
1305+        return d
1306+
1307+
1308+    def get_root_hash(self):
1309+        """
1310+        I return the root of the block hash tree
1311+        """
1312+        d = self._maybe_fetch_offsets_and_header()
1313+        d.addCallback(lambda ignored: self._root_hash)
1314+        return d
1315+
1316+
1317+    def get_salt_hash(self):
1318+        """
1319+        I return the flat salt hash
1320+        """
1321+        d = self._maybe_fetch_offsets_and_header()
1322+        d.addCallback(lambda ignored: self._salt_hash)
1323+        return d
1324+
1325+
1326+    def get_checkstring(self):
1327+        """
1328+        I return the packed representation of the following:
1329+
1330+            - version number
1331+            - sequence number
1332+            - root hash
1333+            - salt hash
1334+
1335+        which my users use as a checkstring to detect other writers.
1336+        """
1337+        d = self._maybe_fetch_offsets_and_header()
1338+        def _build_checkstring(ignored):
1339+            if self._salt_hash:
1340+                checkstring = struct.pack(MDMFCHECKSTRING,
1341+                                          self._version_number,
1342+                                          self._sequence_number,
1343+                                          self._root_hash,
1344+                                          self._salt_hash)
1345+            else:
1346+                checkstring = strut.pack(PREFIX,
1347+                                         self._version_number,
1348+                                         self._sequence_number,
1349+                                         self._root_hash,
1350+                                         self._salt)
1351+            return checkstring
1352+        d.addCallback(_build_checkstring)
1353+        return d
1354+
1355+
1356+    def get_prefix(self, force_remote):
1357+        d = self._maybe_fetch_offsets_and_header(force_remote)
1358+        d.addCallback(lambda ignored:
1359+            self._build_prefix())
1360+        return d
1361+
1362+
1363+    def _build_prefix(self):
1364+        # The prefix is another name for the part of the remote share
1365+        # that gets signed. It consists of everything up to and
1366+        # including the datalength, packed by struct.
1367+        if self._version_number == SDMF_VERSION:
1368+            format_string = SIGNED_PREFIX
1369+            salt_to_use = self._salt
1370+        else:
1371+            format_string = MDMFSIGNABLEHEADER
1372+            salt_to_use = self._salt_hash
1373+        return struct.pack(format_string,
1374+                           self._version_number,
1375+                           self._sequence_number,
1376+                           self._root_hash,
1377+                           salt_to_use,
1378+                           self._required_shares,
1379+                           self._total_shares,
1380+                           self._segment_size,
1381+                           self._data_length)
1382+
1383+
1384+    def _get_offsets_tuple(self):
1385+        # The offsets tuple is another component of the version
1386+        # information tuple. It is basically our offsets dictionary,
1387+        # itemized and in a tuple.
1388+        return self._offsets.copy()
1389+
1390+
1391+    def get_verinfo(self):
1392+        """
1393+        I return my verinfo tuple. This is used by the ServermapUpdater
1394+        to keep track of versions of mutable files.
1395+
1396+        The verinfo tuple for MDMF files contains:
1397+            - seqnum
1398+            - root hash
1399+            - salt hash
1400+            - segsize
1401+            - datalen
1402+            - k
1403+            - n
1404+            - prefix (the thing that you sign)
1405+            - a tuple of offsets
1406+
1407+        The verinfo tuple for SDMF files is the same, but contains a
1408+        16-byte IV instead of a hash of salts.
1409+        """
1410+        d = self._maybe_fetch_offsets_and_header()
1411+        def _build_verinfo(ignored):
1412+            if self._version_number == SDMF_VERSION:
1413+                salt_to_use = self._salt
1414+            else:
1415+                salt_to_use = self._salt_hash
1416+            return (self._sequence_number,
1417+                    self._root_hash,
1418+                    salt_to_use,
1419+                    self._segment_size,
1420+                    self._data_length,
1421+                    self._required_shares,
1422+                    self._total_shares,
1423+                    self._build_prefix(),
1424+                    self._get_offsets_tuple())
1425+        d.addCallback(_build_verinfo)
1426+        return d
1427+
1428+
1429+    def flush(self):
1430+        """
1431+        I flush my queue of read vectors.
1432+        """
1433+        d = self._read(self._readvs)
1434+        def _then(results):
1435+            self_readv = []
1436+            self._queue_observers.notify(results)
1437+            self._queue_observers = observer.ObserverList()
1438+        d.addCallback(_then)
1439+
1440+
1441+    def _read(self, readvs, force_remote=False, queue=False):
1442+        unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
1443+        # TODO: It's entirely possible to tweak this so that it just
1444+        # fulfills the requests that it can, and not demand that all
1445+        # requests are satisfiable before running it.
1446+        if not unsatisfiable and not force_remote:
1447+            results = [self._data[offset:offset+length]
1448+                       for (offset, length) in readvs]
1449+            results = {self.shnum: results}
1450+            return defer.succeed(results)
1451+        else:
1452+            if queue:
1453+                start = len(self._readvs)
1454+                self._readvs += readvs
1455+                end = len(self._readvs)
1456+                def _get_results(results, start, end):
1457+                    if not self.shnum in results:
1458+                        return {self._shnum: [""]}
1459+                    return {self.shnum: results[self.shnum][start:end]}
1460+                d = defer.Deferred()
1461+                d.addCallback(_get_results, start, end)
1462+                self._queue_observers.subscribe(d.callback)
1463+                return d
1464+            return self._rref.callRemote("slot_readv",
1465+                                         self._storage_index,
1466+                                         [self.shnum],
1467+                                         readvs)
1468+
1469+
1470+    def is_sdmf(self):
1471+        """I tell my caller whether or not my remote file is SDMF or MDMF
1472+        """
1473+        d = self._maybe_fetch_offsets_and_header()
1474+        d.addCallback(lambda ignored:
1475+            self._version_number == 0)
1476+        return d
1477+
1478+
1479+class LayoutInvalid(Exception):
1480+    """
1481+    This isn't a valid MDMF mutable file
1482+    """
1483hunk ./src/allmydata/test/test_storage.py 2
1484 
1485-import time, os.path, stat, re, simplejson, struct
1486+import time, os.path, stat, re, simplejson, struct, shutil
1487 
1488 from twisted.trial import unittest
1489 
1490hunk ./src/allmydata/test/test_storage.py 22
1491 from allmydata.storage.expirer import LeaseCheckingCrawler
1492 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
1493      ReadBucketProxy
1494-from allmydata.interfaces import BadWriteEnablerError
1495-from allmydata.test.common import LoggingServiceParent
1496+from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
1497+                                     LayoutInvalid, MDMFSIGNABLEHEADER, \
1498+                                     SIGNED_PREFIX, MDMFHEADER
1499+from allmydata.interfaces import BadWriteEnablerError, MDMF_VERSION, \
1500+                                 SDMF_VERSION
1501+from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
1502 from allmydata.test.common_web import WebRenderingMixin
1503 from allmydata.web.storage import StorageStatus, remove_prefix
1504 
1505hunk ./src/allmydata/test/test_storage.py 105
1506 
1507 class RemoteBucket:
1508 
1509+    def __init__(self):
1510+        self.read_count = 0
1511+        self.write_count = 0
1512+
1513     def callRemote(self, methname, *args, **kwargs):
1514         def _call():
1515             meth = getattr(self.target, "remote_" + methname)
1516hunk ./src/allmydata/test/test_storage.py 113
1517             return meth(*args, **kwargs)
1518+
1519+        if methname == "slot_readv":
1520+            self.read_count += 1
1521+        if methname == "slot_writev":
1522+            self.write_count += 1
1523+
1524         return defer.maybeDeferred(_call)
1525 
1526hunk ./src/allmydata/test/test_storage.py 121
1527+
1528 class BucketProxy(unittest.TestCase):
1529     def make_bucket(self, name, size):
1530         basedir = os.path.join("storage", "BucketProxy", name)
1531hunk ./src/allmydata/test/test_storage.py 1298
1532         self.failUnless(os.path.exists(prefixdir), prefixdir)
1533         self.failIf(os.path.exists(bucketdir), bucketdir)
1534 
1535+
1536+class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1537+    def setUp(self):
1538+        self.sparent = LoggingServiceParent()
1539+        self._lease_secret = itertools.count()
1540+        self.ss = self.create("MDMFProxies storage test server")
1541+        self.rref = RemoteBucket()
1542+        self.rref.target = self.ss
1543+        self.secrets = (self.write_enabler("we_secret"),
1544+                        self.renew_secret("renew_secret"),
1545+                        self.cancel_secret("cancel_secret"))
1546+        self.segment = "aaaaaa"
1547+        self.block = "aa"
1548+        self.salt = "a" * 16
1549+        self.block_hash = "a" * 32
1550+        self.block_hash_tree = [self.block_hash for i in xrange(6)]
1551+        self.share_hash = self.block_hash
1552+        self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1553+        self.signature = "foobarbaz"
1554+        self.verification_key = "vvvvvv"
1555+        self.encprivkey = "private"
1556+        self.root_hash = self.block_hash
1557+        self.salt_hash = self.root_hash
1558+        self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1559+        self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1560+        self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1561+        # blockhashes and salt hashes are serialized in the same way,
1562+        # only we lop off the first element and store that in the
1563+        # header.
1564+        self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1565+
1566+
1567+    def tearDown(self):
1568+        self.sparent.stopService()
1569+        shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1570+
1571+
1572+    def write_enabler(self, we_tag):
1573+        return hashutil.tagged_hash("we_blah", we_tag)
1574+
1575+
1576+    def renew_secret(self, tag):
1577+        return hashutil.tagged_hash("renew_blah", str(tag))
1578+
1579+
1580+    def cancel_secret(self, tag):
1581+        return hashutil.tagged_hash("cancel_blah", str(tag))
1582+
1583+
1584+    def workdir(self, name):
1585+        basedir = os.path.join("storage", "MutableServer", name)
1586+        return basedir
1587+
1588+
1589+    def create(self, name):
1590+        workdir = self.workdir(name)
1591+        ss = StorageServer(workdir, "\x00" * 20)
1592+        ss.setServiceParent(self.sparent)
1593+        return ss
1594+
1595+
1596+    def build_test_mdmf_share(self, tail_segment=False, empty=False):
1597+        # Start with the checkstring
1598+        data = struct.pack(">BQ32s32s",
1599+                           1,
1600+                           0,
1601+                           self.root_hash,
1602+                           self.salt_hash)
1603+        self.checkstring = data
1604+        # Next, the encoding parameters
1605+        if tail_segment:
1606+            data += struct.pack(">BBQQ",
1607+                                3,
1608+                                10,
1609+                                6,
1610+                                33)
1611+        elif empty:
1612+            data += struct.pack(">BBQQ",
1613+                                3,
1614+                                10,
1615+                                0,
1616+                                0)
1617+        else:
1618+            data += struct.pack(">BBQQ",
1619+                                3,
1620+                                10,
1621+                                6,
1622+                                36)
1623+        # Now we'll build the offsets.
1624+        # The header -- everything up to the salts -- is 143 bytes long.
1625+        # The shares come after the salts.
1626+        if empty:
1627+            salts = ""
1628+        else:
1629+            salts = self.salt * 6
1630+        share_offset = 151 + len(salts)
1631+        if tail_segment:
1632+            sharedata = self.block * 6
1633+        elif empty:
1634+            sharedata = ""
1635+        else:
1636+            sharedata = self.block * 6 + "a"
1637+        # The encrypted private key comes after the shares
1638+        encrypted_private_key_offset = share_offset + len(sharedata)
1639+        # The blockhashes come after the private key
1640+        blockhashes_offset = encrypted_private_key_offset + len(self.encprivkey)
1641+        # The salthashes come after the blockhashes
1642+        salthashes_offset = blockhashes_offset + len(self.block_hash_tree_s)
1643+        # The sharehashes come after the salt hashes
1644+        sharehashes_offset = salthashes_offset + len(self.salt_hash_tree_s)
1645+        # The signature comes after the share hash chain
1646+        signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1647+        # The verification key comes after the signature
1648+        verification_offset = signature_offset + len(self.signature)
1649+        # The EOF comes after the verification key
1650+        eof_offset = verification_offset + len(self.verification_key)
1651+        data += struct.pack(">LQQQQQQQ",
1652+                            share_offset,
1653+                            encrypted_private_key_offset,
1654+                            blockhashes_offset,
1655+                            salthashes_offset,
1656+                            sharehashes_offset,
1657+                            signature_offset,
1658+                            verification_offset,
1659+                            eof_offset)
1660+        self.offsets = {}
1661+        self.offsets['share_data'] = share_offset
1662+        self.offsets['enc_privkey'] = encrypted_private_key_offset
1663+        self.offsets['block_hash_tree'] = blockhashes_offset
1664+        self.offsets['salt_hash_tree'] = salthashes_offset
1665+        self.offsets['share_hash_chain'] = sharehashes_offset
1666+        self.offsets['signature'] = signature_offset
1667+        self.offsets['verification_key'] = verification_offset
1668+        self.offsets['EOF'] = eof_offset
1669+        # Next, we'll add in the salts,
1670+        data += salts
1671+        # the share data,
1672+        data += sharedata
1673+        # the private key,
1674+        data += self.encprivkey
1675+        # the block hash tree,
1676+        data += self.block_hash_tree_s
1677+        # the salt hash tree
1678+        data += self.salt_hash_tree_s
1679+        # the share hash chain,
1680+        data += self.share_hash_chain_s
1681+        # the signature,
1682+        data += self.signature
1683+        # and the verification key
1684+        data += self.verification_key
1685+        return data
1686+
1687+
1688+    def write_test_share_to_server(self,
1689+                                   storage_index,
1690+                                   tail_segment=False,
1691+                                   empty=False):
1692+        """
1693+        I write some data for the read tests to read to self.ss
1694+
1695+        If tail_segment=True, then I will write a share that has a
1696+        smaller tail segment than other segments.
1697+        """
1698+        write = self.ss.remote_slot_testv_and_readv_and_writev
1699+        data = self.build_test_mdmf_share(tail_segment, empty)
1700+        # Finally, we write the whole thing to the storage server in one
1701+        # pass.
1702+        testvs = [(0, 1, "eq", "")]
1703+        tws = {}
1704+        tws[0] = (testvs, [(0, data)], None)
1705+        readv = [(0, 1)]
1706+        results = write(storage_index, self.secrets, tws, readv)
1707+        self.failUnless(results[0])
1708+
1709+
1710+    def build_test_sdmf_share(self, empty=False):
1711+        if empty:
1712+            sharedata = ""
1713+        else:
1714+            sharedata = self.segment * 6
1715+        blocksize = len(sharedata) / 3
1716+        block = sharedata[:blocksize]
1717+        prefix = struct.pack(">BQ32s16s BBQQ",
1718+                             0, # version,
1719+                             0,
1720+                             self.root_hash,
1721+                             self.salt,
1722+                             3,
1723+                             10,
1724+                             len(sharedata),
1725+                             len(sharedata),
1726+                            )
1727+        post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1728+        signature_offset = post_offset + len(self.verification_key)
1729+        sharehashes_offset = signature_offset + len(self.signature)
1730+        blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1731+        sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1732+        encprivkey_offset = sharedata_offset + len(block)
1733+        eof_offset = encprivkey_offset + len(self.encprivkey)
1734+        offsets = struct.pack(">LLLLQQ",
1735+                              signature_offset,
1736+                              sharehashes_offset,
1737+                              blockhashes_offset,
1738+                              sharedata_offset,
1739+                              encprivkey_offset,
1740+                              eof_offset)
1741+        final_share = "".join([prefix,
1742+                           offsets,
1743+                           self.verification_key,
1744+                           self.signature,
1745+                           self.share_hash_chain_s,
1746+                           self.block_hash_tree_s,
1747+                           block,
1748+                           self.encprivkey])
1749+        self.offsets = {}
1750+        self.offsets['signature'] = signature_offset
1751+        self.offsets['share_hash_chain'] = sharehashes_offset
1752+        self.offsets['block_hash_tree'] = blockhashes_offset
1753+        self.offsets['share_data'] = sharedata_offset
1754+        self.offsets['enc_privkey'] = encprivkey_offset
1755+        self.offsets['EOF'] = eof_offset
1756+        return final_share
1757+
1758+
1759+    def write_sdmf_share_to_server(self,
1760+                                   storage_index,
1761+                                   empty=False):
1762+        # Some tests need SDMF shares to verify that we can still
1763+        # read them. This method writes one, which resembles but is not
1764+        assert self.rref
1765+        write = self.ss.remote_slot_testv_and_readv_and_writev
1766+        share = self.build_test_sdmf_share(empty)
1767+        testvs = [(0, 1, "eq", "")]
1768+        tws = {}
1769+        tws[0] = (testvs, [(0, share)], None)
1770+        readv = []
1771+        results = write(storage_index, self.secrets, tws, readv)
1772+        self.failUnless(results[0])
1773+
1774+
1775+    def test_read(self):
1776+        self.write_test_share_to_server("si1")
1777+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1778+        # Check that every method equals what we expect it to.
1779+        d = defer.succeed(None)
1780+        def _check_block_and_salt((block, salt)):
1781+            self.failUnlessEqual(block, self.block)
1782+            self.failUnlessEqual(salt, self.salt)
1783+
1784+        for i in xrange(6):
1785+            d.addCallback(lambda ignored, i=i:
1786+                mr.get_block_and_salt(i))
1787+            d.addCallback(_check_block_and_salt)
1788+
1789+        d.addCallback(lambda ignored:
1790+            mr.get_encprivkey())
1791+        d.addCallback(lambda encprivkey:
1792+            self.failUnlessEqual(self.encprivkey, encprivkey))
1793+
1794+        d.addCallback(lambda ignored:
1795+            mr.get_blockhashes())
1796+        d.addCallback(lambda blockhashes:
1797+            self.failUnlessEqual(self.block_hash_tree, blockhashes))
1798+
1799+        d.addCallback(lambda ignored:
1800+            mr.get_salthashes())
1801+        d.addCallback(lambda salthashes:
1802+            self.failUnlessEqual(self.salt_hash_tree[1:], salthashes))
1803+
1804+        d.addCallback(lambda ignored:
1805+            mr.get_sharehashes())
1806+        d.addCallback(lambda sharehashes:
1807+            self.failUnlessEqual(self.share_hash_chain, sharehashes))
1808+
1809+        d.addCallback(lambda ignored:
1810+            mr.get_signature())
1811+        d.addCallback(lambda signature:
1812+            self.failUnlessEqual(signature, self.signature))
1813+
1814+        d.addCallback(lambda ignored:
1815+            mr.get_verification_key())
1816+        d.addCallback(lambda verification_key:
1817+            self.failUnlessEqual(verification_key, self.verification_key))
1818+
1819+        d.addCallback(lambda ignored:
1820+            mr.get_seqnum())
1821+        d.addCallback(lambda seqnum:
1822+            self.failUnlessEqual(seqnum, 0))
1823+
1824+        d.addCallback(lambda ignored:
1825+            mr.get_root_hash())
1826+        d.addCallback(lambda root_hash:
1827+            self.failUnlessEqual(self.root_hash, root_hash))
1828+
1829+        d.addCallback(lambda ignored:
1830+            mr.get_salt_hash())
1831+        d.addCallback(lambda salt_hash:
1832+            self.failUnlessEqual(self.salt_hash, salt_hash))
1833+
1834+        d.addCallback(lambda ignored:
1835+            mr.get_seqnum())
1836+        d.addCallback(lambda seqnum:
1837+            self.failUnlessEqual(0, seqnum))
1838+
1839+        d.addCallback(lambda ignored:
1840+            mr.get_encoding_parameters())
1841+        def _check_encoding_parameters((k, n, segsize, datalen)):
1842+            self.failUnlessEqual(k, 3)
1843+            self.failUnlessEqual(n, 10)
1844+            self.failUnlessEqual(segsize, 6)
1845+            self.failUnlessEqual(datalen, 36)
1846+        d.addCallback(_check_encoding_parameters)
1847+
1848+        d.addCallback(lambda ignored:
1849+            mr.get_checkstring())
1850+        d.addCallback(lambda checkstring:
1851+            self.failUnlessEqual(checkstring, checkstring))
1852+        return d
1853+
1854+
1855+    def test_read_salthashes_on_sdmf_file(self):
1856+        self.write_sdmf_share_to_server("si1")
1857+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1858+        d = defer.succeed(None)
1859+        d.addCallback(lambda ignored:
1860+            mr.get_salthashes())
1861+        d.addCallback(lambda results:
1862+            self.failIf(results))
1863+        return d
1864+
1865+
1866+    def test_read_with_different_tail_segment_size(self):
1867+        self.write_test_share_to_server("si1", tail_segment=True)
1868+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1869+        d = mr.get_block_and_salt(5)
1870+        def _check_tail_segment(results):
1871+            block, salt = results
1872+            self.failUnlessEqual(len(block), 1)
1873+            self.failUnlessEqual(block, "a")
1874+        d.addCallback(_check_tail_segment)
1875+        return d
1876+
1877+
1878+    def test_get_block_with_invalid_segnum(self):
1879+        self.write_test_share_to_server("si1")
1880+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1881+        d = defer.succeed(None)
1882+        d.addCallback(lambda ignored:
1883+            self.shouldFail(LayoutInvalid, "test invalid segnum",
1884+                            None,
1885+                            mr.get_block_and_salt, 7))
1886+        return d
1887+
1888+
1889+    def test_get_encoding_parameters_first(self):
1890+        self.write_test_share_to_server("si1")
1891+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1892+        d = mr.get_encoding_parameters()
1893+        def _check_encoding_parameters((k, n, segment_size, datalen)):
1894+            self.failUnlessEqual(k, 3)
1895+            self.failUnlessEqual(n, 10)
1896+            self.failUnlessEqual(segment_size, 6)
1897+            self.failUnlessEqual(datalen, 36)
1898+        d.addCallback(_check_encoding_parameters)
1899+        return d
1900+
1901+
1902+    def test_get_seqnum_first(self):
1903+        self.write_test_share_to_server("si1")
1904+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1905+        d = mr.get_seqnum()
1906+        d.addCallback(lambda seqnum:
1907+            self.failUnlessEqual(seqnum, 0))
1908+        return d
1909+
1910+
1911+    def test_get_root_hash_first(self):
1912+        self.write_test_share_to_server("si1")
1913+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1914+        d = mr.get_root_hash()
1915+        d.addCallback(lambda root_hash:
1916+            self.failUnlessEqual(root_hash, self.root_hash))
1917+        return d
1918+
1919+
1920+    def test_get_salt_hash_first(self):
1921+        self.write_test_share_to_server("si1")
1922+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1923+        d = mr.get_salt_hash()
1924+        d.addCallback(lambda salt_hash:
1925+            self.failUnlessEqual(salt_hash, self.salt_hash))
1926+        return d
1927+
1928+
1929+    def test_get_checkstring_first(self):
1930+        self.write_test_share_to_server("si1")
1931+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1932+        d = mr.get_checkstring()
1933+        d.addCallback(lambda checkstring:
1934+            self.failUnlessEqual(checkstring, self.checkstring))
1935+        return d
1936+
1937+
1938+    def test_write_read_vectors(self):
1939+        # When writing for us, the storage server will return to us a
1940+        # read vector, along with its result. If a write fails because
1941+        # the test vectors failed, this read vector can help us to
1942+        # diagnose the problem. This test ensures that the read vector
1943+        # is working appropriately.
1944+        mw = self._make_new_mw("si1", 0)
1945+        d = defer.succeed(None)
1946+
1947+        # Write one share. This should return a checkstring of nothing,
1948+        # since there is no data there.
1949+        d.addCallback(lambda ignored:
1950+            mw.put_block(self.block, 0, self.salt))
1951+        def _check_first_write(results):
1952+            result, readvs = results
1953+            self.failUnless(result)
1954+            self.failIf(readvs)
1955+        d.addCallback(_check_first_write)
1956+        # Now, there should be a different checkstring returned when
1957+        # we write other shares
1958+        d.addCallback(lambda ignored:
1959+            mw.put_block(self.block, 1, self.salt))
1960+        def _check_next_write(results):
1961+            result, readvs = results
1962+            self.failUnless(result)
1963+            self.expected_checkstring = mw.get_checkstring()
1964+            self.failUnlessIn(0, readvs)
1965+            self.failUnlessEqual(readvs[0][0], self.expected_checkstring)
1966+        d.addCallback(_check_next_write)
1967+        # Add the other four shares
1968+        for i in xrange(2, 6):
1969+            d.addCallback(lambda ignored, i=i:
1970+                mw.put_block(self.block, i, self.salt))
1971+            d.addCallback(_check_next_write)
1972+        # Add the encrypted private key
1973+        d.addCallback(lambda ignored:
1974+            mw.put_encprivkey(self.encprivkey))
1975+        d.addCallback(_check_next_write)
1976+        # Add the block hash tree and share hash tree
1977+        d.addCallback(lambda ignored:
1978+            mw.put_blockhashes(self.block_hash_tree))
1979+        d.addCallback(_check_next_write)
1980+        d.addCallback(lambda ignored:
1981+            mw.put_salthashes(self.salt_hash_tree))
1982+        d.addCallback(_check_next_write)
1983+        d.addCallback(lambda ignored:
1984+            mw.put_sharehashes(self.share_hash_chain))
1985+        d.addCallback(_check_next_write)
1986+        # Add the root hash and the salt hash. This should change the
1987+        # checkstring, but not in a way that we'll be able to see right
1988+        # now, since the read vectors are applied before the write
1989+        # vectors.
1990+        d.addCallback(lambda ignored:
1991+            mw.put_root_hash(self.root_hash))
1992+        def _check_old_testv_after_new_one_is_written(results):
1993+            result, readvs = results
1994+            self.failUnless(result)
1995+            self.failUnlessIn(0, readvs)
1996+            self.failUnlessEqual(self.expected_checkstring,
1997+                                 readvs[0][0])
1998+            new_checkstring = mw.get_checkstring()
1999+            self.failIfEqual(new_checkstring,
2000+                             readvs[0][0])
2001+        d.addCallback(_check_old_testv_after_new_one_is_written)
2002+        # Now add the signature. This should succeed, meaning that the
2003+        # data gets written and the read vector matches what the writer
2004+        # thinks should be there.
2005+        d.addCallback(lambda ignored:
2006+            mw.put_signature(self.signature))
2007+        d.addCallback(_check_next_write)
2008+        # The checkstring remains the same for the rest of the process.
2009+        return d
2010+
2011+
2012+    def test_blockhashes_after_salt_hash_tree(self):
2013+        mw = self._make_new_mw("si1", 0)
2014+        d = defer.succeed(None)
2015+        # Put everything up to and including the salt hash tree
2016+        for i in xrange(6):
2017+            d.addCallback(lambda ignored, i=i:
2018+                mw.put_block(self.block, i, self.salt))
2019+        d.addCallback(lambda ignored:
2020+            mw.put_encprivkey(self.encprivkey))
2021+        d.addCallback(lambda ignored:
2022+            mw.put_blockhashes(self.block_hash_tree))
2023+        d.addCallback(lambda ignored:
2024+            mw.put_salthashes(self.salt_hash_tree))
2025+        # Now try to put a block hash tree after the salt hash tree
2026+        # This won't necessarily overwrite the share hash chain, but it
2027+        # is a bad idea in general -- if we write one that is anything
2028+        # other than the exact size of the initial one, we will either
2029+        # overwrite the share hash chain, or give the reader (who uses
2030+        # the offset of the share hash chain as an end boundary) a
2031+        # shorter tree than they know to read, which will result in them
2032+        # reading junk. There is little reason to support it as a use
2033+        # case, so we should disallow it altogether.
2034+        d.addCallback(lambda ignored:
2035+            self.shouldFail(LayoutInvalid, "test same blockhashes",
2036+                            None,
2037+                            mw.put_blockhashes, self.block_hash_tree))
2038+        return d
2039+
2040+
2041+    def test_salt_hash_tree_after_share_hash_chain(self):
2042+        mw = self._make_new_mw("si1", 0)
2043+        d = defer.succeed(None)
2044+        # Put everything up to and including the share hash chain
2045+        for i in xrange(6):
2046+            d.addCallback(lambda ignored, i=i:
2047+                mw.put_block(self.block, i, self.salt))
2048+        d.addCallback(lambda ignored:
2049+            mw.put_encprivkey(self.encprivkey))
2050+        d.addCallback(lambda ignored:
2051+            mw.put_blockhashes(self.block_hash_tree))
2052+        d.addCallback(lambda ignored:
2053+            mw.put_salthashes(self.salt_hash_tree))
2054+        d.addCallback(lambda ignored:
2055+            mw.put_sharehashes(self.share_hash_chain))
2056+
2057+        # Now try to put the salt hash tree again. This should fail for
2058+        # the same reason that it fails in the previous test.
2059+        d.addCallback(lambda ignored:
2060+            self.shouldFail(LayoutInvalid, "test repeat salthashes",
2061+                            None,
2062+                            mw.put_salthashes, self.salt_hash_tree))
2063+        return d
2064+
2065+
2066+    def test_encprivkey_after_blockhashes(self):
2067+        mw = self._make_new_mw("si1", 0)
2068+        d = defer.succeed(None)
2069+        # Put everything up to and including the block hash tree
2070+        for i in xrange(6):
2071+            d.addCallback(lambda ignored, i=i:
2072+                mw.put_block(self.block, i, self.salt))
2073+        d.addCallback(lambda ignored:
2074+            mw.put_encprivkey(self.encprivkey))
2075+        d.addCallback(lambda ignored:
2076+            mw.put_blockhashes(self.block_hash_tree))
2077+        d.addCallback(lambda ignored:
2078+            self.shouldFail(LayoutInvalid, "out of order private key",
2079+                            None,
2080+                            mw.put_encprivkey, self.encprivkey))
2081+        return d
2082+
2083+
2084+    def test_share_hash_chain_after_signature(self):
2085+        mw = self._make_new_mw("si1", 0)
2086+        d = defer.succeed(None)
2087+        # Put everything up to and including the signature
2088+        for i in xrange(6):
2089+            d.addCallback(lambda ignored, i=i:
2090+                mw.put_block(self.block, i, self.salt))
2091+        d.addCallback(lambda ignored:
2092+            mw.put_encprivkey(self.encprivkey))
2093+        d.addCallback(lambda ignored:
2094+            mw.put_blockhashes(self.block_hash_tree))
2095+        d.addCallback(lambda ignored:
2096+            mw.put_salthashes(self.salt_hash_tree))
2097+        d.addCallback(lambda ignored:
2098+            mw.put_sharehashes(self.share_hash_chain))
2099+        d.addCallback(lambda ignored:
2100+            mw.put_root_hash(self.root_hash))
2101+        d.addCallback(lambda ignored:
2102+            mw.put_signature(self.signature))
2103+        # Now try to put the share hash chain again. This should fail
2104+        d.addCallback(lambda ignored:
2105+            self.shouldFail(LayoutInvalid, "out of order share hash chain",
2106+                            None,
2107+                            mw.put_sharehashes, self.share_hash_chain))
2108+        return d
2109+
2110+
2111+    def test_signature_after_verification_key(self):
2112+        mw = self._make_new_mw("si1", 0)
2113+        d = defer.succeed(None)
2114+        # Put everything up to and including the verification key.
2115+        for i in xrange(6):
2116+            d.addCallback(lambda ignored, i=i:
2117+                mw.put_block(self.block, i, self.salt))
2118+        d.addCallback(lambda ignored:
2119+            mw.put_encprivkey(self.encprivkey))
2120+        d.addCallback(lambda ignored:
2121+            mw.put_blockhashes(self.block_hash_tree))
2122+        d.addCallback(lambda ignored:
2123+            mw.put_salthashes(self.salt_hash_tree))
2124+        d.addCallback(lambda ignored:
2125+            mw.put_sharehashes(self.share_hash_chain))
2126+        d.addCallback(lambda ignored:
2127+            mw.put_root_hash(self.root_hash))
2128+        d.addCallback(lambda ignored:
2129+            mw.put_signature(self.signature))
2130+        d.addCallback(lambda ignored:
2131+            mw.put_verification_key(self.verification_key))
2132+        # Now try to put the signature again. This should fail
2133+        d.addCallback(lambda ignored:
2134+            self.shouldFail(LayoutInvalid, "signature after verification",
2135+                            None,
2136+                            mw.put_signature, self.signature))
2137+        return d
2138+
2139+
2140+    def test_uncoordinated_write(self):
2141+        # Make two mutable writers, both pointing to the same storage
2142+        # server, both at the same storage index, and try writing to the
2143+        # same share.
2144+        mw1 = self._make_new_mw("si1", 0)
2145+        mw2 = self._make_new_mw("si1", 0)
2146+        d = defer.succeed(None)
2147+        def _check_success(results):
2148+            result, readvs = results
2149+            self.failUnless(result)
2150+
2151+        def _check_failure(results):
2152+            result, readvs = results
2153+            self.failIf(result)
2154+
2155+        d.addCallback(lambda ignored:
2156+            mw1.put_block(self.block, 0, self.salt))
2157+        d.addCallback(_check_success)
2158+        d.addCallback(lambda ignored:
2159+            mw2.put_block(self.block, 0, self.salt))
2160+        d.addCallback(_check_failure)
2161+        return d
2162+
2163+
2164+    def test_invalid_salt_size(self):
2165+        # Salts need to be 16 bytes in size. Writes that attempt to
2166+        # write more or less than this should be rejected.
2167+        mw = self._make_new_mw("si1", 0)
2168+        invalid_salt = "a" * 17 # 17 bytes
2169+        another_invalid_salt = "b" * 15 # 15 bytes
2170+        d = defer.succeed(None)
2171+        d.addCallback(lambda ignored:
2172+            self.shouldFail(LayoutInvalid, "salt too big",
2173+                            None,
2174+                            mw.put_block, self.block, 0, invalid_salt))
2175+        d.addCallback(lambda ignored:
2176+            self.shouldFail(LayoutInvalid, "salt too small",
2177+                            None,
2178+                            mw.put_block, self.block, 0,
2179+                            another_invalid_salt))
2180+        return d
2181+
2182+
2183+    def test_write_test_vectors(self):
2184+        # If we give the write proxy a bogus test vector at
2185+        # any point during the process, it should fail to write.
2186+        mw = self._make_new_mw("si1", 0)
2187+        mw.set_checkstring("this is a lie")
2188+        # The initial write should be expecting to find the improbable
2189+        # checkstring above in place; finding nothing, it should fail.
2190+        d = defer.succeed(None)
2191+        d.addCallback(lambda ignored:
2192+            mw.put_block(self.block, 0, self.salt))
2193+        def _check_failure(results):
2194+            result, readv = results
2195+            self.failIf(result)
2196+        d.addCallback(_check_failure)
2197+        # Now set the checkstring to the empty string, which
2198+        # indicates that no share is there.
2199+        d.addCallback(lambda ignored:
2200+            mw.set_checkstring(""))
2201+        d.addCallback(lambda ignored:
2202+            mw.put_block(self.block, 0, self.salt))
2203+        def _check_success(results):
2204+            result, readv = results
2205+            self.failUnless(result)
2206+        d.addCallback(_check_success)
2207+        # Now set the checkstring to something wrong
2208+        d.addCallback(lambda ignored:
2209+            mw.set_checkstring("something wrong"))
2210+        # This should fail to do anything
2211+        d.addCallback(lambda ignored:
2212+            mw.put_block(self.block, 1, self.salt))
2213+        d.addCallback(_check_failure)
2214+        # Now set it back to what it should be.
2215+        d.addCallback(lambda ignored:
2216+            mw.set_checkstring(mw.get_checkstring()))
2217+        for i in xrange(1, 6):
2218+            d.addCallback(lambda ignored, i=i:
2219+                mw.put_block(self.block, i, self.salt))
2220+            d.addCallback(_check_success)
2221+        d.addCallback(lambda ignored:
2222+            mw.put_encprivkey(self.encprivkey))
2223+        d.addCallback(_check_success)
2224+        d.addCallback(lambda ignored:
2225+            mw.put_blockhashes(self.block_hash_tree))
2226+        d.addCallback(_check_success)
2227+        d.addCallback(lambda ignored:
2228+            mw.put_salthashes(self.salt_hash_tree))
2229+        d.addCallback(_check_success)
2230+        d.addCallback(lambda ignored:
2231+            mw.put_sharehashes(self.share_hash_chain))
2232+        d.addCallback(_check_success)
2233+        def _keep_old_checkstring(ignored):
2234+            self.old_checkstring = mw.get_checkstring()
2235+            mw.set_checkstring("foobarbaz")
2236+        d.addCallback(_keep_old_checkstring)
2237+        d.addCallback(lambda ignored:
2238+            mw.put_root_hash(self.root_hash))
2239+        d.addCallback(_check_failure)
2240+        d.addCallback(lambda ignored:
2241+            self.failUnlessEqual(self.old_checkstring, mw.get_checkstring()))
2242+        def _restore_old_checkstring(ignored):
2243+            mw.set_checkstring(self.old_checkstring)
2244+        d.addCallback(_restore_old_checkstring)
2245+        d.addCallback(lambda ignored:
2246+            mw.put_root_hash(self.root_hash))
2247+        d.addCallback(_check_success)
2248+        # The checkstring should have been set appropriately for us on
2249+        # the last write; if we try to change it to something else,
2250+        # that change should cause the verification key step to fail.
2251+        d.addCallback(lambda ignored:
2252+            mw.set_checkstring("something else"))
2253+        d.addCallback(lambda ignored:
2254+            mw.put_signature(self.signature))
2255+        d.addCallback(_check_failure)
2256+        d.addCallback(lambda ignored:
2257+            mw.set_checkstring(mw.get_checkstring()))
2258+        d.addCallback(lambda ignored:
2259+            mw.put_signature(self.signature))
2260+        d.addCallback(_check_success)
2261+        d.addCallback(lambda ignored:
2262+            mw.put_verification_key(self.verification_key))
2263+        d.addCallback(_check_success)
2264+        return d
2265+
2266+
2267+    def test_offset_only_set_on_success(self):
2268+        # The write proxy should be smart enough to detect when a write
2269+        # has failed, and to temper its definition of progress based on
2270+        # that.
2271+        mw = self._make_new_mw("si1", 0)
2272+        d = defer.succeed(None)
2273+        for i in xrange(1, 6):
2274+            d.addCallback(lambda ignored, i=i:
2275+                mw.put_block(self.block, i, self.salt))
2276+        def _break_checkstring(ignored):
2277+            self._old_checkstring = mw.get_checkstring()
2278+            mw.set_checkstring("foobarbaz")
2279+
2280+        def _fix_checkstring(ignored):
2281+            mw.set_checkstring(self._old_checkstring)
2282+
2283+        d.addCallback(_break_checkstring)
2284+
2285+        # Setting the encrypted private key shouldn't work now, which is
2286+        # to be expected and is tested elsewhere. We also want to make
2287+        # sure that we can't add the block hash tree after a failed
2288+        # write of this sort.
2289+        d.addCallback(lambda ignored:
2290+            mw.put_encprivkey(self.encprivkey))
2291+        d.addCallback(lambda ignored:
2292+            self.shouldFail(LayoutInvalid, "test out-of-order blockhashes",
2293+                            None,
2294+                            mw.put_blockhashes, self.block_hash_tree))
2295+        d.addCallback(_fix_checkstring)
2296+        d.addCallback(lambda ignored:
2297+            mw.put_encprivkey(self.encprivkey))
2298+        d.addCallback(_break_checkstring)
2299+        d.addCallback(lambda ignored:
2300+            mw.put_blockhashes(self.block_hash_tree))
2301+        d.addCallback(lambda ignored:
2302+            self.shouldFail(LayoutInvalid, "test out-of-order sharehashes",
2303+                            None,
2304+                            mw.put_sharehashes, self.share_hash_chain))
2305+        d.addCallback(_fix_checkstring)
2306+        d.addCallback(lambda ignored:
2307+            mw.put_blockhashes(self.block_hash_tree))
2308+        d.addCallback(lambda ignored:
2309+            mw.put_salthashes(self.salt_hash_tree))
2310+        d.addCallback(_break_checkstring)
2311+        d.addCallback(lambda ignored:
2312+            mw.put_sharehashes(self.share_hash_chain))
2313+        d.addCallback(lambda ignored:
2314+            self.shouldFail(LayoutInvalid, "out-of-order root hash",
2315+                            None,
2316+                            mw.put_root_hash, self.root_hash))
2317+        d.addCallback(_fix_checkstring)
2318+        d.addCallback(lambda ignored:
2319+            mw.put_sharehashes(self.share_hash_chain))
2320+        d.addCallback(_break_checkstring)
2321+        d.addCallback(lambda ignored:
2322+            mw.put_root_hash(self.root_hash))
2323+        d.addCallback(lambda ignored:
2324+            self.shouldFail(LayoutInvalid, "out-of-order signature",
2325+                            None,
2326+                            mw.put_signature, self.signature))
2327+        d.addCallback(_fix_checkstring)
2328+        d.addCallback(lambda ignored:
2329+            mw.put_root_hash(self.root_hash))
2330+        d.addCallback(_break_checkstring)
2331+        d.addCallback(lambda ignored:
2332+            mw.put_signature(self.signature))
2333+        d.addCallback(lambda ignored:
2334+            self.shouldFail(LayoutInvalid, "out-of-order verification key",
2335+                            None,
2336+                            mw.put_verification_key,
2337+                            self.verification_key))
2338+        d.addCallback(_fix_checkstring)
2339+        d.addCallback(lambda ignored:
2340+            mw.put_signature(self.signature))
2341+        d.addCallback(_break_checkstring)
2342+        d.addCallback(lambda ignored:
2343+            mw.put_verification_key(self.verification_key))
2344+        d.addCallback(lambda ignored:
2345+            self.shouldFail(LayoutInvalid, "out-of-order finish",
2346+                            None,
2347+                            mw.finish_publishing))
2348+        return d
2349+
2350+
2351+    def serialize_blockhashes(self, blockhashes):
2352+        return "".join(blockhashes)
2353+
2354+
2355+    def serialize_sharehashes(self, sharehashes):
2356+        ret = "".join([struct.pack(">H32s", i, sharehashes[i])
2357+                        for i in sorted(sharehashes.keys())])
2358+        return ret
2359+
2360+
2361+    def test_write(self):
2362+        # This translates to a file with 6 6-byte segments, and with 2-byte
2363+        # blocks.
2364+        mw = self._make_new_mw("si1", 0)
2365+        mw2 = self._make_new_mw("si1", 1)
2366+        # Test writing some blocks.
2367+        read = self.ss.remote_slot_readv
2368+        expected_salt_offset = struct.calcsize(MDMFHEADER)
2369+        expected_share_offset = expected_salt_offset + (16 * 6)
2370+        def _check_block_write(i, share):
2371+            self.failUnlessEqual(read("si1", [share], [(expected_share_offset + (i * 2), 2)]),
2372+                                {share: [self.block]})
2373+            self.failUnlessEqual(read("si1", [share], [(expected_salt_offset + (i * 16), 16)]),
2374+                                 {share: [self.salt]})
2375+        d = defer.succeed(None)
2376+        for i in xrange(6):
2377+            d.addCallback(lambda ignored, i=i:
2378+                mw.put_block(self.block, i, self.salt))
2379+            d.addCallback(lambda ignored, i=i:
2380+                _check_block_write(i, 0))
2381+        # Now try the same thing, but with share 1 instead of share 0.
2382+        for i in xrange(6):
2383+            d.addCallback(lambda ignored, i=i:
2384+                mw2.put_block(self.block, i, self.salt))
2385+            d.addCallback(lambda ignored, i=i:
2386+                _check_block_write(i, 1))
2387+
2388+        # Next, we make a fake encrypted private key, and put it onto the
2389+        # storage server.
2390+        d.addCallback(lambda ignored:
2391+            mw.put_encprivkey(self.encprivkey))
2392+        # So far, we have:
2393+        #  header:  143 bytes
2394+        #  salts:   16 * 6 = 96 bytes
2395+        #  blocks:  2 * 6 = 12 bytes
2396+        #   = 251 bytes
2397+        expected_private_key_offset = expected_share_offset + len(self.block) * 6
2398+        self.failUnlessEqual(len(self.encprivkey), 7)
2399+        d.addCallback(lambda ignored:
2400+            self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
2401+                                 {0: [self.encprivkey]}))
2402+
2403+        # Next, we put a fake block hash tree.
2404+        d.addCallback(lambda ignored:
2405+            mw.put_blockhashes(self.block_hash_tree))
2406+        # The block hash tree got inserted at:
2407+        #  header + salts + blocks: 251 bytes
2408+        #  encrypted private key:   7 bytes
2409+        #       = 258 bytes
2410+        expected_block_hash_offset = expected_private_key_offset + len(self.encprivkey)
2411+        self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
2412+        d.addCallback(lambda ignored:
2413+            self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
2414+                                 {0: [self.block_hash_tree_s]}))
2415+
2416+        # Next, we put a fake salt hash tree.
2417+        d.addCallback(lambda ignored:
2418+            mw.put_salthashes(self.salt_hash_tree))
2419+        # The salt hash tree got inserted at
2420+        # header + salts + blocks + private key = 258 bytes
2421+        # block hash tree:          32 * 6 = 192 bytes
2422+        #   = 450 bytes
2423+        expected_salt_hash_offset = expected_block_hash_offset + len(self.block_hash_tree_s)
2424+        d.addCallback(lambda ignored:
2425+            self.failUnlessEqual(read("si1", [0], [(expected_salt_hash_offset, 32 * 5)]), {0: [self.salt_hash_tree_s]}))
2426+
2427+        # Next, put a fake share hash chain
2428+        d.addCallback(lambda ignored:
2429+            mw.put_sharehashes(self.share_hash_chain))
2430+        # The share hash chain got inserted at:
2431+        # header + salts + blocks + private key = 258 bytes
2432+        # block hash tree:                        32 * 6 = 192 bytes
2433+        # salt hash tree:                         32 * 5 = 160 bytes
2434+        #   = 610
2435+        expected_share_hash_offset = expected_salt_hash_offset + len(self.salt_hash_tree_s)
2436+        d.addCallback(lambda ignored:
2437+            self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
2438+                                 {0: [self.share_hash_chain_s]}))
2439+
2440+        # Next, we put what is supposed to be the root hash of
2441+        # our share hash tree but isn't       
2442+        d.addCallback(lambda ignored:
2443+            mw.put_root_hash(self.root_hash))
2444+        # The root hash gets inserted at byte 9 (its position is in the header,
2445+        # and is fixed). The salt is right after it.
2446+        def _check(ignored):
2447+            self.failUnlessEqual(read("si1", [0], [(9, 32)]),
2448+                                 {0: [self.root_hash]})
2449+            self.failUnlessEqual(read("si1", [0], [(41, 32)]),
2450+                                 {0: [self.salt_hash]})
2451+        d.addCallback(_check)
2452+
2453+        # Next, we put a signature of the header block.
2454+        d.addCallback(lambda ignored:
2455+            mw.put_signature(self.signature))
2456+        # The signature gets written to:
2457+        #   header + salts + blocks + block and salt and share hash tree = 814
2458+        expected_signature_offset = expected_share_hash_offset + len(self.share_hash_chain_s)
2459+        self.failUnlessEqual(len(self.signature), 9)
2460+        d.addCallback(lambda ignored:
2461+            self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
2462+                                 {0: [self.signature]}))
2463+
2464+        # Next, we put the verification key
2465+        d.addCallback(lambda ignored:
2466+            mw.put_verification_key(self.verification_key))
2467+        # The verification key gets written to:
2468+        #   804 + 9 = 815 bytes
2469+        expected_verification_key_offset = expected_signature_offset + len(self.signature)
2470+        self.failUnlessEqual(len(self.verification_key), 6)
2471+        d.addCallback(lambda ignored:
2472+            self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
2473+                                 {0: [self.verification_key]}))
2474+
2475+        def _check_signable(ignored):
2476+            # Make sure that the signable is what we think it should be.
2477+            signable = mw.get_signable()
2478+            verno, seq, roothash, salthash, k, n, segsize, datalen = \
2479+                                            struct.unpack(">BQ32s32sBBQQ",
2480+                                                          signable)
2481+            self.failUnlessEqual(verno, 1)
2482+            self.failUnlessEqual(seq, 0)
2483+            self.failUnlessEqual(roothash, self.root_hash)
2484+            self.failUnlessEqual(salthash, self.salt_hash)
2485+            self.failUnlessEqual(k, 3)
2486+            self.failUnlessEqual(n, 10)
2487+            self.failUnlessEqual(segsize, 6)
2488+            self.failUnlessEqual(datalen, 36)
2489+        d.addCallback(_check_signable)
2490+        # Next, we cause the offset table to be published.
2491+        d.addCallback(lambda ignored:
2492+            mw.finish_publishing())
2493+        expected_eof_offset = expected_verification_key_offset + len(self.verification_key)
2494+
2495+        # The offset table starts at byte 91. Happily, we have already
2496+        # worked out most of these offsets above, but we want to make
2497+        # sure that the representation on disk agrees what what we've
2498+        # calculated.
2499+        #
2500+        # (we don't have an explicit offset for the AES salts, because
2501+        # we know that they start right after the header)
2502+        def _check_offsets(ignored):
2503+            # Check the version number to make sure that it is correct.
2504+            expected_version_number = struct.pack(">B", 1)
2505+            self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2506+                                 {0: [expected_version_number]})
2507+            # Check the sequence number to make sure that it is correct
2508+            expected_sequence_number = struct.pack(">Q", 0)
2509+            self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2510+                                 {0: [expected_sequence_number]})
2511+            # Check that the encoding parameters (k, N, segement size, data
2512+            # length) are what they should be. These are  3, 10, 6, 36
2513+            expected_k = struct.pack(">B", 3)
2514+            self.failUnlessEqual(read("si1", [0], [(73, 1)]),
2515+                                 {0: [expected_k]})
2516+            expected_n = struct.pack(">B", 10)
2517+            self.failUnlessEqual(read("si1", [0], [(74, 1)]),
2518+                                 {0: [expected_n]})
2519+            expected_segment_size = struct.pack(">Q", 6)
2520+            self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2521+                                 {0: [expected_segment_size]})
2522+            expected_data_length = struct.pack(">Q", 36)
2523+            self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2524+                                 {0: [expected_data_length]})
2525+            # 91          4           The offset of the share data
2526+            expected_offset = struct.pack(">L", expected_share_offset)
2527+            self.failUnlessEqual(read("si1", [0], [(91, 4)]),
2528+                                 {0: [expected_offset]})
2529+            # 95          8           The offset of the encrypted private key
2530+            expected_offset = struct.pack(">Q", expected_private_key_offset)
2531+            self.failUnlessEqual(read("si1", [0], [(95, 8)]),
2532+                                 {0: [expected_offset]})
2533+            # 103         8           The offset of the block hash tree
2534+            expected_offset = struct.pack(">Q", expected_block_hash_offset)
2535+            self.failUnlessEqual(read("si1", [0], [(103, 8)]),
2536+                                 {0: [expected_offset]})
2537+            # 111         8           The offset of the salt hash tree
2538+            expected_offset = struct.pack(">Q", expected_salt_hash_offset)
2539+            self.failUnlessEqual(read("si1", [0], [(111, 8)]),
2540+                                 {0: [expected_offset]})
2541+            # 119         8           The offset of the share hash chain
2542+            expected_offset = struct.pack(">Q", expected_share_hash_offset)
2543+            self.failUnlessEqual(read("si1", [0], [(119, 8)]),
2544+                                 {0: [expected_offset]})
2545+            # 127         8           The offset of the signature
2546+            expected_offset = struct.pack(">Q", expected_signature_offset)
2547+            self.failUnlessEqual(read("si1", [0], [(127, 8)]),
2548+                                 {0: [expected_offset]})
2549+            # 135         8           offset of the verification_key
2550+            expected_offset = struct.pack(">Q", expected_verification_key_offset)
2551+            self.failUnlessEqual(read("si1", [0], [(135, 8)]),
2552+                                 {0: [expected_offset]})
2553+            # 143         8           offset of the EOF
2554+            expected_offset = struct.pack(">Q", expected_eof_offset)
2555+            self.failUnlessEqual(read("si1", [0], [(143, 8)]),
2556+                                 {0: [expected_offset]})
2557+        d.addCallback(_check_offsets)
2558+        return d
2559+
2560+    def _make_new_mw(self, si, share, datalength=36):
2561+        # This is a file of size 36 bytes. Since it has a segment
2562+        # size of 6, we know that it has 6 byte segments, which will
2563+        # be split into blocks of 2 bytes because our FEC k
2564+        # parameter is 3.
2565+        mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2566+                                6, datalength)
2567+        return mw
2568+
2569+
2570+    def test_write_rejected_with_too_many_blocks(self):
2571+        mw = self._make_new_mw("si0", 0)
2572+
2573+        # Try writing too many blocks. We should not be able to write
2574+        # more than 6
2575+        # blocks into each share.
2576+        d = defer.succeed(None)
2577+        for i in xrange(6):
2578+            d.addCallback(lambda ignored, i=i:
2579+                mw.put_block(self.block, i, self.salt))
2580+        d.addCallback(lambda ignored:
2581+            self.shouldFail(LayoutInvalid, "too many blocks",
2582+                            None,
2583+                            mw.put_block, self.block, 7, self.salt))
2584+        return d
2585+
2586+
2587+    def test_write_rejected_with_invalid_salt(self):
2588+        # Try writing an invalid salt. Salts are 16 bytes -- any more or
2589+        # less should cause an error.
2590+        mw = self._make_new_mw("si1", 0)
2591+        bad_salt = "a" * 17 # 17 bytes
2592+        d = defer.succeed(None)
2593+        d.addCallback(lambda ignored:
2594+            self.shouldFail(LayoutInvalid, "test_invalid_salt",
2595+                            None, mw.put_block, self.block, 7, bad_salt))
2596+        return d
2597+
2598+
2599+    def test_write_rejected_with_invalid_root_hash(self):
2600+        # Try writing an invalid root hash. This should be SHA256d, and
2601+        # 32 bytes long as a result.
2602+        mw = self._make_new_mw("si2", 0)
2603+        # 17 bytes != 32 bytes
2604+        invalid_root_hash = "a" * 17
2605+        d = defer.succeed(None)
2606+        # Before this test can work, we need to put some blocks + salts,
2607+        # a block hash tree, and a share hash tree. Otherwise, we'll see
2608+        # failures that match what we are looking for, but are caused by
2609+        # the constraints imposed on operation ordering.
2610+        for i in xrange(6):
2611+            d.addCallback(lambda ignored, i=i:
2612+                mw.put_block(self.block, i, self.salt))
2613+        d.addCallback(lambda ignored:
2614+            mw.put_encprivkey(self.encprivkey))
2615+        d.addCallback(lambda ignored:
2616+            mw.put_blockhashes(self.block_hash_tree))
2617+        d.addCallback(lambda ignored:
2618+            mw.put_salthashes(self.salt_hash_tree))
2619+        d.addCallback(lambda ignored:
2620+            mw.put_sharehashes(self.share_hash_chain))
2621+        d.addCallback(lambda ignored:
2622+            self.shouldFail(LayoutInvalid, "invalid root hash",
2623+                            None, mw.put_root_hash, invalid_root_hash))
2624+        return d
2625+
2626+
2627+    def test_write_rejected_with_invalid_blocksize(self):
2628+        # The blocksize implied by the writer that we get from
2629+        # _make_new_mw is 2bytes -- any more or any less than this
2630+        # should be cause for failure, unless it is the tail segment, in
2631+        # which case it may not be failure.
2632+        invalid_block = "a"
2633+        mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2634+                                             # one byte blocks
2635+        # 1 bytes != 2 bytes
2636+        d = defer.succeed(None)
2637+        d.addCallback(lambda ignored, invalid_block=invalid_block:
2638+            self.shouldFail(LayoutInvalid, "test blocksize too small",
2639+                            None, mw.put_block, invalid_block, 0,
2640+                            self.salt))
2641+        invalid_block = invalid_block * 3
2642+        # 3 bytes != 2 bytes
2643+        d.addCallback(lambda ignored:
2644+            self.shouldFail(LayoutInvalid, "test blocksize too large",
2645+                            None,
2646+                            mw.put_block, invalid_block, 0, self.salt))
2647+        for i in xrange(5):
2648+            d.addCallback(lambda ignored, i=i:
2649+                mw.put_block(self.block, i, self.salt))
2650+        # Try to put an invalid tail segment
2651+        d.addCallback(lambda ignored:
2652+            self.shouldFail(LayoutInvalid, "test invalid tail segment",
2653+                            None,
2654+                            mw.put_block, self.block, 5, self.salt))
2655+        valid_block = "a"
2656+        d.addCallback(lambda ignored:
2657+            mw.put_block(valid_block, 5, self.salt))
2658+        return d
2659+
2660+
2661+    def test_write_enforces_order_constraints(self):
2662+        # We require that the MDMFSlotWriteProxy be interacted with in a
2663+        # specific way.
2664+        # That way is:
2665+        # 0: __init__
2666+        # 1: write blocks and salts
2667+        # 2: Write the encrypted private key
2668+        # 3: Write the block hashes
2669+        # 4: Write the share hashes
2670+        # 5: Write the root hash and salt hash
2671+        # 6: Write the signature and verification key
2672+        # 7: Write the file.
2673+        #
2674+        # Some of these can be performed out-of-order, and some can't.
2675+        # The dependencies that I want to test here are:
2676+        #  - Private key before block hashes
2677+        #  - share hashes and block hashes before root hash
2678+        #  - root hash before signature
2679+        #  - signature before verification key
2680+        mw0 = self._make_new_mw("si0", 0)
2681+        # Write some shares
2682+        d = defer.succeed(None)
2683+        for i in xrange(6):
2684+            d.addCallback(lambda ignored, i=i:
2685+                mw0.put_block(self.block, i, self.salt))
2686+        # Try to write the block hashes before writing the encrypted
2687+        # private key
2688+        d.addCallback(lambda ignored:
2689+            self.shouldFail(LayoutInvalid, "block hashes before key",
2690+                            None, mw0.put_blockhashes,
2691+                            self.block_hash_tree))
2692+
2693+        # Write the private key.
2694+        d.addCallback(lambda ignored:
2695+            mw0.put_encprivkey(self.encprivkey))
2696+
2697+
2698+        # Try to write the salt hash tree without writing the block hash
2699+        # tree.
2700+        d.addCallback(lambda ignored:
2701+            self.shouldFail(LayoutInvalid, "salt hash tree before bht",
2702+                            None,
2703+                            mw0.put_salthashes, self.salt_hash_tree))
2704+
2705+
2706+        # Try to write the share hash chain without writing the block
2707+        # hash tree
2708+        d.addCallback(lambda ignored:
2709+            self.shouldFail(LayoutInvalid, "share hash chain before "
2710+                                           "salt hash tree",
2711+                            None,
2712+                            mw0.put_sharehashes, self.share_hash_chain))
2713+
2714+        # Try to write the root hash and salt hash without writing either the
2715+        # block hashes or the salt hashes or the share hashes
2716+        d.addCallback(lambda ignored:
2717+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
2718+                            None,
2719+                            mw0.put_root_hash, self.root_hash))
2720+
2721+        # Now write the block hashes and try again
2722+        d.addCallback(lambda ignored:
2723+            mw0.put_blockhashes(self.block_hash_tree))
2724+
2725+        d.addCallback(lambda ignored:
2726+            self.shouldFail(LayoutInvalid, "share hash before salt hashes",
2727+                            None,
2728+                            mw0.put_sharehashes, self.share_hash_chain))
2729+        d.addCallback(lambda ignored:
2730+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
2731+                            None, mw0.put_root_hash, self.root_hash))
2732+
2733+        # We haven't yet put the root hash on the share, so we shouldn't
2734+        # be able to sign it.
2735+        d.addCallback(lambda ignored:
2736+            self.shouldFail(LayoutInvalid, "signature before root hash",
2737+                            None, mw0.put_signature, self.signature))
2738+
2739+        d.addCallback(lambda ignored:
2740+            self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2741+
2742+        # ..and, since that fails, we also shouldn't be able to put the
2743+        # verification key.
2744+        d.addCallback(lambda ignored:
2745+            self.shouldFail(LayoutInvalid, "key before signature",
2746+                            None, mw0.put_verification_key,
2747+                            self.verification_key))
2748+
2749+        # Now write the salt hashes, and try again.
2750+        d.addCallback(lambda ignored:
2751+            mw0.put_salthashes(self.salt_hash_tree))
2752+
2753+        d.addCallback(lambda ignored:
2754+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
2755+                            None,
2756+                            mw0.put_root_hash, self.root_hash))
2757+
2758+        # We should still be unable to sign the header
2759+        d.addCallback(lambda ignored:
2760+            self.shouldFail(LayoutInvalid, "signature before hashes",
2761+                            None,
2762+                            mw0.put_signature, self.signature))
2763+
2764+        # Now write the share hashes.
2765+        d.addCallback(lambda ignored:
2766+            mw0.put_sharehashes(self.share_hash_chain))
2767+        # We should be able to write the root hash now too
2768+        d.addCallback(lambda ignored:
2769+            mw0.put_root_hash(self.root_hash))
2770+
2771+        # We should still be unable to put the verification key
2772+        d.addCallback(lambda ignored:
2773+            self.shouldFail(LayoutInvalid, "key before signature",
2774+                            None, mw0.put_verification_key,
2775+                            self.verification_key))
2776+
2777+        d.addCallback(lambda ignored:
2778+            mw0.put_signature(self.signature))
2779+
2780+        # We shouldn't be able to write the offsets to the remote server
2781+        # until the offset table is finished; IOW, until we have written
2782+        # the verification key.
2783+        d.addCallback(lambda ignored:
2784+            self.shouldFail(LayoutInvalid, "offsets before verification key",
2785+                            None,
2786+                            mw0.finish_publishing))
2787+
2788+        d.addCallback(lambda ignored:
2789+            mw0.put_verification_key(self.verification_key))
2790+        return d
2791+
2792+
2793+    def test_end_to_end(self):
2794+        mw = self._make_new_mw("si1", 0)
2795+        # Write a share using the mutable writer, and make sure that the
2796+        # reader knows how to read everything back to us.
2797+        d = defer.succeed(None)
2798+        for i in xrange(6):
2799+            d.addCallback(lambda ignored, i=i:
2800+                mw.put_block(self.block, i, self.salt))
2801+        d.addCallback(lambda ignored:
2802+            mw.put_encprivkey(self.encprivkey))
2803+        d.addCallback(lambda ignored:
2804+            mw.put_blockhashes(self.block_hash_tree))
2805+        d.addCallback(lambda ignored:
2806+            mw.put_salthashes(self.salt_hash_tree))
2807+        d.addCallback(lambda ignored:
2808+            mw.put_sharehashes(self.share_hash_chain))
2809+        d.addCallback(lambda ignored:
2810+            mw.put_root_hash(self.root_hash))
2811+        d.addCallback(lambda ignored:
2812+            mw.put_signature(self.signature))
2813+        d.addCallback(lambda ignored:
2814+            mw.put_verification_key(self.verification_key))
2815+        d.addCallback(lambda ignored:
2816+            mw.finish_publishing())
2817+
2818+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2819+        def _check_block_and_salt((block, salt)):
2820+            self.failUnlessEqual(block, self.block)
2821+            self.failUnlessEqual(salt, self.salt)
2822+
2823+        for i in xrange(6):
2824+            d.addCallback(lambda ignored, i=i:
2825+                mr.get_block_and_salt(i))
2826+            d.addCallback(_check_block_and_salt)
2827+
2828+        d.addCallback(lambda ignored:
2829+            mr.get_encprivkey())
2830+        d.addCallback(lambda encprivkey:
2831+            self.failUnlessEqual(self.encprivkey, encprivkey))
2832+
2833+        d.addCallback(lambda ignored:
2834+            mr.get_blockhashes())
2835+        d.addCallback(lambda blockhashes:
2836+            self.failUnlessEqual(self.block_hash_tree, blockhashes))
2837+
2838+        d.addCallback(lambda ignored:
2839+            mr.get_sharehashes())
2840+        d.addCallback(lambda sharehashes:
2841+            self.failUnlessEqual(self.share_hash_chain, sharehashes))
2842+
2843+        d.addCallback(lambda ignored:
2844+            mr.get_signature())
2845+        d.addCallback(lambda signature:
2846+            self.failUnlessEqual(signature, self.signature))
2847+
2848+        d.addCallback(lambda ignored:
2849+            mr.get_verification_key())
2850+        d.addCallback(lambda verification_key:
2851+            self.failUnlessEqual(verification_key, self.verification_key))
2852+
2853+        d.addCallback(lambda ignored:
2854+            mr.get_seqnum())
2855+        d.addCallback(lambda seqnum:
2856+            self.failUnlessEqual(seqnum, 0))
2857+
2858+        d.addCallback(lambda ignored:
2859+            mr.get_root_hash())
2860+        d.addCallback(lambda root_hash:
2861+            self.failUnlessEqual(self.root_hash, root_hash))
2862+
2863+        d.addCallback(lambda ignored:
2864+            mr.get_salt_hash())
2865+        d.addCallback(lambda salt_hash:
2866+            self.failUnlessEqual(self.salt_hash, salt_hash))
2867+
2868+        d.addCallback(lambda ignored:
2869+            mr.get_encoding_parameters())
2870+        def _check_encoding_parameters((k, n, segsize, datalen)):
2871+            self.failUnlessEqual(k, 3)
2872+            self.failUnlessEqual(n, 10)
2873+            self.failUnlessEqual(segsize, 6)
2874+            self.failUnlessEqual(datalen, 36)
2875+        d.addCallback(_check_encoding_parameters)
2876+
2877+        d.addCallback(lambda ignored:
2878+            mr.get_checkstring())
2879+        d.addCallback(lambda checkstring:
2880+            self.failUnlessEqual(checkstring, mw.get_checkstring()))
2881+        return d
2882+
2883+
2884+    def test_is_sdmf(self):
2885+        # The MDMFSlotReadProxy should also know how to read SDMF files,
2886+        # since it will encounter them on the grid. Callers use the
2887+        # is_sdmf method to test this.
2888+        self.write_sdmf_share_to_server("si1")
2889+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2890+        d = mr.is_sdmf()
2891+        d.addCallback(lambda issdmf:
2892+            self.failUnless(issdmf))
2893+        return d
2894+
2895+
2896+    def test_reads_sdmf(self):
2897+        # The slot read proxy should, naturally, know how to tell us
2898+        # about data in the SDMF format
2899+        self.write_sdmf_share_to_server("si1")
2900+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2901+        d = defer.succeed(None)
2902+        d.addCallback(lambda ignored:
2903+            mr.is_sdmf())
2904+        d.addCallback(lambda issdmf:
2905+            self.failUnless(issdmf))
2906+
2907+        # What do we need to read?
2908+        #  - The sharedata
2909+        #  - The salt
2910+        d.addCallback(lambda ignored:
2911+            mr.get_block_and_salt(0))
2912+        def _check_block_and_salt(results):
2913+            block, salt = results
2914+            # Our original file is 36 bytes long. Then each share is 12
2915+            # bytes in size. The share is composed entirely of the
2916+            # letter a. self.block contains 2 as, so 6 * self.block is
2917+            # what we are looking for.
2918+            self.failUnlessEqual(block, self.block * 6)
2919+            self.failUnlessEqual(salt, self.salt)
2920+        d.addCallback(_check_block_and_salt)
2921+
2922+        #  - The blockhashes
2923+        d.addCallback(lambda ignored:
2924+            mr.get_blockhashes())
2925+        d.addCallback(lambda blockhashes:
2926+            self.failUnlessEqual(self.block_hash_tree,
2927+                                 blockhashes,
2928+                                 blockhashes))
2929+        #  - The sharehashes
2930+        d.addCallback(lambda ignored:
2931+            mr.get_sharehashes())
2932+        d.addCallback(lambda sharehashes:
2933+            self.failUnlessEqual(self.share_hash_chain,
2934+                                 sharehashes))
2935+        #  - The keys
2936+        d.addCallback(lambda ignored:
2937+            mr.get_encprivkey())
2938+        d.addCallback(lambda encprivkey:
2939+            self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2940+        d.addCallback(lambda ignored:
2941+            mr.get_verification_key())
2942+        d.addCallback(lambda verification_key:
2943+            self.failUnlessEqual(verification_key,
2944+                                 self.verification_key,
2945+                                 verification_key))
2946+        #  - The signature
2947+        d.addCallback(lambda ignored:
2948+            mr.get_signature())
2949+        d.addCallback(lambda signature:
2950+            self.failUnlessEqual(signature, self.signature, signature))
2951+
2952+        #  - The sequence number
2953+        d.addCallback(lambda ignored:
2954+            mr.get_seqnum())
2955+        d.addCallback(lambda seqnum:
2956+            self.failUnlessEqual(seqnum, 0, seqnum))
2957+
2958+        #  - The root hash
2959+        #  - The salt hash (to verify that it is None)
2960+        d.addCallback(lambda ignored:
2961+            mr.get_root_hash())
2962+        d.addCallback(lambda root_hash:
2963+            self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2964+        d.addCallback(lambda ignored:
2965+            mr.get_salt_hash())
2966+        d.addCallback(lambda salt_hash:
2967+            self.failIf(salt_hash))
2968+        return d
2969+
2970+
2971+    def test_only_reads_one_segment_sdmf(self):
2972+        # SDMF shares have only one segment, so it doesn't make sense to
2973+        # read more segments than that. The reader should know this and
2974+        # complain if we try to do that.
2975+        self.write_sdmf_share_to_server("si1")
2976+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2977+        d = defer.succeed(None)
2978+        d.addCallback(lambda ignored:
2979+            mr.is_sdmf())
2980+        d.addCallback(lambda issdmf:
2981+            self.failUnless(issdmf))
2982+        d.addCallback(lambda ignored:
2983+            self.shouldFail(LayoutInvalid, "test bad segment",
2984+                            None,
2985+                            mr.get_block_and_salt, 1))
2986+        return d
2987+
2988+
2989+    def test_read_with_prefetched_mdmf_data(self):
2990+        # The MDMFSlotReadProxy will prefill certain fields if you pass
2991+        # it data that you have already fetched. This is useful for
2992+        # cases like the Servermap, which prefetches ~2kb of data while
2993+        # finding out which shares are on the remote peer so that it
2994+        # doesn't waste round trips.
2995+        mdmf_data = self.build_test_mdmf_share()
2996+        self.write_test_share_to_server("si1")
2997+        def _make_mr(ignored, length):
2998+            mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2999+            return mr
3000+
3001+        d = defer.succeed(None)
3002+        # This should be enough to fill in both the encoding parameters
3003+        # and the table of offsets, which will complete the version
3004+        # information tuple.
3005+        d.addCallback(_make_mr, 151)
3006+        d.addCallback(lambda mr:
3007+            mr.get_verinfo())
3008+        def _check_verinfo(verinfo):
3009+            self.failUnless(verinfo)
3010+            self.failUnlessEqual(len(verinfo), 9)
3011+            (seqnum,
3012+             root_hash,
3013+             salt_hash,
3014+             segsize,
3015+             datalen,
3016+             k,
3017+             n,
3018+             prefix,
3019+             offsets) = verinfo
3020+            self.failUnlessEqual(seqnum, 0)
3021+            self.failUnlessEqual(root_hash, self.root_hash)
3022+            self.failUnlessEqual(salt_hash, self.salt_hash)
3023+            self.failUnlessEqual(segsize, 6)
3024+            self.failUnlessEqual(datalen, 36)
3025+            self.failUnlessEqual(k, 3)
3026+            self.failUnlessEqual(n, 10)
3027+            expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
3028+                                          1,
3029+                                          seqnum,
3030+                                          root_hash,
3031+                                          salt_hash,
3032+                                          k,
3033+                                          n,
3034+                                          segsize,
3035+                                          datalen)
3036+            self.failUnlessEqual(expected_prefix, prefix)
3037+            self.failUnlessEqual(self.rref.read_count, 0)
3038+        d.addCallback(_check_verinfo)
3039+        # This is not enough data to read a block and a share, so the
3040+        # wrapper should attempt to read this from the remote server.
3041+        d.addCallback(_make_mr, 151)
3042+        d.addCallback(lambda mr:
3043+            mr.get_block_and_salt(0))
3044+        def _check_block_and_salt((block, salt)):
3045+            self.failUnlessEqual(block, self.block)
3046+            self.failUnlessEqual(salt, self.salt)
3047+            self.failUnlessEqual(self.rref.read_count, 1)
3048+        # The file that we're playing with has 6 segments. Then there
3049+        # are 6 * 16 = 96 bytes of salts before we can write shares.
3050+        # Each block has two bytes, so 143 + 96 + 2 = 241 bytes should
3051+        # be enough to read one block.
3052+        d.addCallback(_make_mr, 249)
3053+        d.addCallback(lambda mr:
3054+            mr.get_block_and_salt(0))
3055+        d.addCallback(_check_block_and_salt)
3056+        return d
3057+
3058+
3059+    def test_read_with_prefetched_sdmf_data(self):
3060+        sdmf_data = self.build_test_sdmf_share()
3061+        self.write_sdmf_share_to_server("si1")
3062+        def _make_mr(ignored, length):
3063+            mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
3064+            return mr
3065+
3066+        d = defer.succeed(None)
3067+        # This should be enough to get us the encoding parameters,
3068+        # offset table, and everything else we need to build a verinfo
3069+        # string.
3070+        d.addCallback(_make_mr, 107)
3071+        d.addCallback(lambda mr:
3072+            mr.get_verinfo())
3073+        def _check_verinfo(verinfo):
3074+            self.failUnless(verinfo)
3075+            self.failUnlessEqual(len(verinfo), 9)
3076+            (seqnum,
3077+             root_hash,
3078+             salt,
3079+             segsize,
3080+             datalen,
3081+             k,
3082+             n,
3083+             prefix,
3084+             offsets) = verinfo
3085+            self.failUnlessEqual(seqnum, 0)
3086+            self.failUnlessEqual(root_hash, self.root_hash)
3087+            self.failUnlessEqual(salt, self.salt)
3088+            self.failUnlessEqual(segsize, 36)
3089+            self.failUnlessEqual(datalen, 36)
3090+            self.failUnlessEqual(k, 3)
3091+            self.failUnlessEqual(n, 10)
3092+            expected_prefix = struct.pack(SIGNED_PREFIX,
3093+                                          0,
3094+                                          seqnum,
3095+                                          root_hash,
3096+                                          salt,
3097+                                          k,
3098+                                          n,
3099+                                          segsize,
3100+                                          datalen)
3101+            self.failUnlessEqual(expected_prefix, prefix)
3102+            self.failUnlessEqual(self.rref.read_count, 0)
3103+        d.addCallback(_check_verinfo)
3104+        # This shouldn't be enough to read any share data.
3105+        d.addCallback(_make_mr, 107)
3106+        d.addCallback(lambda mr:
3107+            mr.get_block_and_salt(0))
3108+        def _check_block_and_salt((block, salt)):
3109+            self.failUnlessEqual(block, self.block * 6)
3110+            self.failUnlessEqual(salt, self.salt)
3111+            # TODO: Fix the read routine so that it reads only the data
3112+            #       that it has cached if it can't read all of it.
3113+            self.failUnlessEqual(self.rref.read_count, 2)
3114+
3115+        # This should be enough to read share data.
3116+        d.addCallback(_make_mr, self.offsets['share_data'])
3117+        d.addCallback(lambda mr:
3118+            mr.get_block_and_salt(0))
3119+        d.addCallback(_check_block_and_salt)
3120+        return d
3121+
3122+
3123+    def test_read_with_empty_mdmf_file(self):
3124+        # Some tests upload a file with no contents to test things
3125+        # unrelated to the actual handling of the content of the file.
3126+        # The reader should behave intelligently in these cases.
3127+        self.write_test_share_to_server("si1", empty=True)
3128+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3129+        # We should be able to get the encoding parameters, and they
3130+        # should be correct.
3131+        d = defer.succeed(None)
3132+        d.addCallback(lambda ignored:
3133+            mr.get_encoding_parameters())
3134+        def _check_encoding_parameters(params):
3135+            self.failUnlessEqual(len(params), 4)
3136+            k, n, segsize, datalen = params
3137+            self.failUnlessEqual(k, 3)
3138+            self.failUnlessEqual(n, 10)
3139+            self.failUnlessEqual(segsize, 0)
3140+            self.failUnlessEqual(datalen, 0)
3141+        d.addCallback(_check_encoding_parameters)
3142+
3143+        # We should not be able to fetch a block, since there are no
3144+        # blocks to fetch
3145+        d.addCallback(lambda ignored:
3146+            self.shouldFail(LayoutInvalid, "get block on empty file",
3147+                            None,
3148+                            mr.get_block_and_salt, 0))
3149+        return d
3150+
3151+
3152+    def test_read_with_empty_sdmf_file(self):
3153+        self.write_sdmf_share_to_server("si1", empty=True)
3154+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3155+        # We should be able to get the encoding parameters, and they
3156+        # should be correct
3157+        d = defer.succeed(None)
3158+        d.addCallback(lambda ignored:
3159+            mr.get_encoding_parameters())
3160+        def _check_encoding_parameters(params):
3161+            self.failUnlessEqual(len(params), 4)
3162+            k, n, segsize, datalen = params
3163+            self.failUnlessEqual(k, 3)
3164+            self.failUnlessEqual(n, 10)
3165+            self.failUnlessEqual(segsize, 0)
3166+            self.failUnlessEqual(datalen, 0)
3167+        d.addCallback(_check_encoding_parameters)
3168+
3169+        # It does not make sense to get a block in this format, so we
3170+        # should not be able to.
3171+        d.addCallback(lambda ignored:
3172+            self.shouldFail(LayoutInvalid, "get block on an empty file",
3173+                            None,
3174+                            mr.get_block_and_salt, 0))
3175+        return d
3176+
3177+
3178+    def test_verinfo_with_sdmf_file(self):
3179+        self.write_sdmf_share_to_server("si1")
3180+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3181+        # We should be able to get the version information.
3182+        d = defer.succeed(None)
3183+        d.addCallback(lambda ignored:
3184+            mr.get_verinfo())
3185+        def _check_verinfo(verinfo):
3186+            self.failUnless(verinfo)
3187+            self.failUnlessEqual(len(verinfo), 9)
3188+            (seqnum,
3189+             root_hash,
3190+             salt,
3191+             segsize,
3192+             datalen,
3193+             k,
3194+             n,
3195+             prefix,
3196+             offsets) = verinfo
3197+            self.failUnlessEqual(seqnum, 0)
3198+            self.failUnlessEqual(root_hash, self.root_hash)
3199+            self.failUnlessEqual(salt, self.salt)
3200+            self.failUnlessEqual(segsize, 36)
3201+            self.failUnlessEqual(datalen, 36)
3202+            self.failUnlessEqual(k, 3)
3203+            self.failUnlessEqual(n, 10)
3204+            expected_prefix = struct.pack(">BQ32s16s BBQQ",
3205+                                          0,
3206+                                          seqnum,
3207+                                          root_hash,
3208+                                          salt,
3209+                                          k,
3210+                                          n,
3211+                                          segsize,
3212+                                          datalen)
3213+            self.failUnlessEqual(prefix, expected_prefix)
3214+            self.failUnlessEqual(offsets, self.offsets)
3215+        d.addCallback(_check_verinfo)
3216+        return d
3217+
3218+
3219+    def test_verinfo_with_mdmf_file(self):
3220+        self.write_test_share_to_server("si1")
3221+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3222+        d = defer.succeed(None)
3223+        d.addCallback(lambda ignored:
3224+            mr.get_verinfo())
3225+        def _check_verinfo(verinfo):
3226+            self.failUnless(verinfo)
3227+            self.failUnlessEqual(len(verinfo), 9)
3228+            (seqnum,
3229+             root_hash,
3230+             salt_hash,
3231+             segsize,
3232+             datalen,
3233+             k,
3234+             n,
3235+             prefix,
3236+             offsets) = verinfo
3237+            self.failUnlessEqual(seqnum, 0)
3238+            self.failUnlessEqual(root_hash, self.root_hash)
3239+            self.failUnlessEqual(salt_hash, self.salt_hash)
3240+            self.failUnlessEqual(segsize, 6)
3241+            self.failUnlessEqual(datalen, 36)
3242+            self.failUnlessEqual(k, 3)
3243+            self.failUnlessEqual(n, 10)
3244+            expected_prefix = struct.pack(">BQ32s32s BBQQ",
3245+                                          1,
3246+                                          seqnum,
3247+                                          root_hash,
3248+                                          salt_hash,
3249+                                          k,
3250+                                          n,
3251+                                          segsize,
3252+                                          datalen)
3253+            self.failUnlessEqual(prefix, expected_prefix)
3254+            self.failUnlessEqual(offsets, self.offsets)
3255+        d.addCallback(_check_verinfo)
3256+        return d
3257+
3258+
3259+    def test_reader_queue(self):
3260+        self.write_test_share_to_server('si1')
3261+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3262+        d1 = mr.get_block_and_salt(0, queue=True)
3263+        d2 = mr.get_blockhashes(queue=True)
3264+        d3 = mr.get_salthashes(queue=True)
3265+        d4 = mr.get_sharehashes(queue=True)
3266+        d5 = mr.get_signature(queue=True)
3267+        d6 = mr.get_verification_key(queue=True)
3268+        dl = defer.DeferredList([d1, d2, d3, d4, d5, d6])
3269+        mr.flush()
3270+        def _print(results):
3271+            self.failUnlessEqual(len(results), 6)
3272+            # We have one read for version information, one for offsets, and
3273+            # one for everything else.
3274+            self.failUnlessEqual(self.rref.read_count, 3)
3275+            block, salt = results[0][1] # results[0] is a boolean that says
3276+                                           # whether or not the operation
3277+                                           # worked.
3278+            self.failUnlessEqual(self.block, block)
3279+            self.failUnlessEqual(self.salt, salt)
3280+
3281+            blockhashes = results[1][1]
3282+            self.failUnlessEqual(self.block_hash_tree, blockhashes)
3283+
3284+            salthashes = results[2][1]
3285+            self.failUnlessEqual(self.salt_hash_tree[1:], salthashes)
3286+
3287+            sharehashes = results[3][1]
3288+            self.failUnlessEqual(self.share_hash_chain, sharehashes)
3289+
3290+            signature = results[4][1]
3291+            self.failUnlessEqual(self.signature, signature)
3292+
3293+            verification_key = results[5][1]
3294+            self.failUnlessEqual(self.verification_key, verification_key)
3295+        dl.addCallback(_print)
3296+        return dl
3297+
3298+
3299 class Stats(unittest.TestCase):
3300 
3301     def setUp(self):
3302}
3303[Write a segmented mutable downloader
3304Kevan Carstensen <kevan@isnotajoke.com>**20100624233843
3305 Ignore-this: a9689b403db93c89b5d943f5aaee573e
3306 
3307 The segmented mutable downloader can deal with MDMF files (files with
3308 one or more segments in MDMF format) and SDMF files (files with one
3309 segment in SDMF format). It is backwards compatible with the old
3310 file format.
3311 
3312 This patch also contains tests for the segmented mutable downloader.
3313] {
3314hunk ./src/allmydata/mutable/retrieve.py 9
3315 from twisted.python import failure
3316 from foolscap.api import DeadReferenceError, eventually, fireEventually
3317 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
3318-from allmydata.util import hashutil, idlib, log
3319+from allmydata.util import hashutil, idlib, log, mathutil
3320 from allmydata import hashtree, codec
3321 from allmydata.storage.server import si_b2a
3322 from pycryptopp.cipher.aes import AES
3323hunk ./src/allmydata/mutable/retrieve.py 16
3324 from pycryptopp.publickey import rsa
3325 
3326 from allmydata.mutable.common import DictOfSets, CorruptShareError, UncoordinatedWriteError
3327-from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data
3328+from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data, \
3329+                                     MDMFSlotReadProxy
3330 
3331 class RetrieveStatus:
3332     implements(IRetrieveStatus)
3333hunk ./src/allmydata/mutable/retrieve.py 103
3334         self.verinfo = verinfo
3335         # during repair, we may be called upon to grab the private key, since
3336         # it wasn't picked up during a verify=False checker run, and we'll
3337-        # need it for repair to generate the a new version.
3338+        # need it for repair to generate a new version.
3339         self._need_privkey = fetch_privkey
3340         if self._node.get_privkey():
3341             self._need_privkey = False
3342hunk ./src/allmydata/mutable/retrieve.py 108
3343 
3344+        if self._need_privkey:
3345+            # TODO: Evaluate the need for this. We'll use it if we want
3346+            # to limit how many queries are on the wire for the privkey
3347+            # at once.
3348+            self._privkey_query_markers = [] # one Marker for each time we've
3349+                                             # tried to get the privkey.
3350+
3351         self._status = RetrieveStatus()
3352         self._status.set_storage_index(self._storage_index)
3353         self._status.set_helper(False)
3354hunk ./src/allmydata/mutable/retrieve.py 124
3355          offsets_tuple) = self.verinfo
3356         self._status.set_size(datalength)
3357         self._status.set_encoding(k, N)
3358+        self.readers = {}
3359 
3360     def get_status(self):
3361         return self._status
3362hunk ./src/allmydata/mutable/retrieve.py 148
3363         self.remaining_sharemap = DictOfSets()
3364         for (shnum, peerid, timestamp) in shares:
3365             self.remaining_sharemap.add(shnum, peerid)
3366+            # If the servermap update fetched anything, it fetched at least 1
3367+            # KiB, so we ask for that much.
3368+            # TODO: Change the cache methods to allow us to fetch all of the
3369+            # data that they have, then change this method to do that.
3370+            any_cache, timestamp = self._node._read_from_cache(self.verinfo,
3371+                                                               shnum,
3372+                                                               0,
3373+                                                               1000)
3374+            ss = self.servermap.connections[peerid]
3375+            reader = MDMFSlotReadProxy(ss,
3376+                                       self._storage_index,
3377+                                       shnum,
3378+                                       any_cache)
3379+            reader.peerid = peerid
3380+            self.readers[shnum] = reader
3381+
3382 
3383         self.shares = {} # maps shnum to validated blocks
3384hunk ./src/allmydata/mutable/retrieve.py 166
3385+        self._active_readers = [] # list of active readers for this dl.
3386+        self._validated_readers = set() # set of readers that we have
3387+                                        # validated the prefix of
3388+        self._block_hash_trees = {} # shnum => hashtree
3389+        # TODO: Make this into a file-backed consumer or something to
3390+        # conserve memory.
3391+        self._plaintext = ""
3392 
3393         # how many shares do we need?
3394hunk ./src/allmydata/mutable/retrieve.py 175
3395-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3396+        (seqnum,
3397+         root_hash,
3398+         IV,
3399+         segsize,
3400+         datalength,
3401+         k,
3402+         N,
3403+         prefix,
3404          offsets_tuple) = self.verinfo
3405hunk ./src/allmydata/mutable/retrieve.py 184
3406-        assert len(self.remaining_sharemap) >= k
3407-        # we start with the lowest shnums we have available, since FEC is
3408-        # faster if we're using "primary shares"
3409-        self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
3410-        for shnum in self.active_shnums:
3411-            # we use an arbitrary peer who has the share. If shares are
3412-            # doubled up (more than one share per peer), we could make this
3413-            # run faster by spreading the load among multiple peers. But the
3414-            # algorithm to do that is more complicated than I want to write
3415-            # right now, and a well-provisioned grid shouldn't have multiple
3416-            # shares per peer.
3417-            peerid = list(self.remaining_sharemap[shnum])[0]
3418-            self.get_data(shnum, peerid)
3419 
3420hunk ./src/allmydata/mutable/retrieve.py 185
3421-        # control flow beyond this point: state machine. Receiving responses
3422-        # from queries is the input. We might send out more queries, or we
3423-        # might produce a result.
3424 
3425hunk ./src/allmydata/mutable/retrieve.py 186
3426+        # We need one share hash tree for the entire file; its leaves
3427+        # are the roots of the block hash trees for the shares that
3428+        # comprise it, and its root is in the verinfo.
3429+        self.share_hash_tree = hashtree.IncompleteHashTree(N)
3430+        self.share_hash_tree.set_hashes({0: root_hash})
3431+
3432+        # This will set up both the segment decoder and the tail segment
3433+        # decoder, as well as a variety of other instance variables that
3434+        # the download process will use.
3435+        self._setup_encoding_parameters()
3436+        assert len(self.remaining_sharemap) >= k
3437+
3438+        self.log("starting download")
3439+        self._add_active_peers()
3440+        # The download process beyond this is a state machine.
3441+        # _add_active_peers will select the peers that we want to use
3442+        # for the download, and then attempt to start downloading. After
3443+        # each segment, it will check for doneness, reacting to broken
3444+        # peers and corrupt shares as necessary. If it runs out of good
3445+        # peers before downloading all of the segments, _done_deferred
3446+        # will errback.  Otherwise, it will eventually callback with the
3447+        # contents of the mutable file.
3448         return self._done_deferred
3449 
3450hunk ./src/allmydata/mutable/retrieve.py 210
3451-    def get_data(self, shnum, peerid):
3452-        self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
3453-                 shnum=shnum,
3454-                 peerid=idlib.shortnodeid_b2a(peerid),
3455-                 level=log.NOISY)
3456-        ss = self.servermap.connections[peerid]
3457-        started = time.time()
3458-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3459+
3460+    def _setup_encoding_parameters(self):
3461+        """
3462+        I set up the encoding parameters, including k, n, the number
3463+        of segments associated with this file, and the segment decoder.
3464+        """
3465+        (seqnum,
3466+         root_hash,
3467+         IV,
3468+         segsize,
3469+         datalength,
3470+         k,
3471+         n,
3472+         known_prefix,
3473          offsets_tuple) = self.verinfo
3474hunk ./src/allmydata/mutable/retrieve.py 225
3475-        offsets = dict(offsets_tuple)
3476+        self._required_shares = k
3477+        self._total_shares = n
3478+        self._segment_size = segsize
3479+        self._data_length = datalength
3480+        if datalength and segsize:
3481+            self._num_segments = mathutil.div_ceil(datalength, segsize)
3482+            self._tail_data_size = datalength % segsize
3483+        else:
3484+            self._num_segments = 0
3485+            self._tail_data_size = 0
3486 
3487hunk ./src/allmydata/mutable/retrieve.py 236
3488-        # we read the checkstring, to make sure that the data we grab is from
3489-        # the right version.
3490-        readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
3491+        self._segment_decoder = codec.CRSDecoder()
3492+        self._segment_decoder.set_params(segsize, k, n)
3493+        self._current_segment = 0
3494 
3495hunk ./src/allmydata/mutable/retrieve.py 240
3496-        # We also read the data, and the hashes necessary to validate them
3497-        # (share_hash_chain, block_hash_tree, share_data). We don't read the
3498-        # signature or the pubkey, since that was handled during the
3499-        # servermap phase, and we'll be comparing the share hash chain
3500-        # against the roothash that was validated back then.
3501+        if  not self._tail_data_size:
3502+            self._tail_data_size = segsize
3503 
3504hunk ./src/allmydata/mutable/retrieve.py 243
3505-        readv.append( (offsets['share_hash_chain'],
3506-                       offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
3507+        self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
3508+                                                         self._required_shares)
3509+        if self._tail_segment_size == self._segment_size:
3510+            self._tail_decoder = self._segment_decoder
3511+        else:
3512+            self._tail_decoder = codec.CRSDecoder()
3513+            self._tail_decoder.set_params(self._tail_segment_size,
3514+                                          self._required_shares,
3515+                                          self._total_shares)
3516 
3517hunk ./src/allmydata/mutable/retrieve.py 253
3518-        # if we need the private key (for repair), we also fetch that
3519-        if self._need_privkey:
3520-            readv.append( (offsets['enc_privkey'],
3521-                           offsets['EOF'] - offsets['enc_privkey']) )
3522+        self.log("got encoding parameters: "
3523+                 "k: %d "
3524+                 "n: %d "
3525+                 "%d segments of %d bytes each (%d byte tail segment)" % \
3526+                 (k, n, self._num_segments, self._segment_size,
3527+                  self._tail_segment_size))
3528 
3529hunk ./src/allmydata/mutable/retrieve.py 260
3530-        m = Marker()
3531-        self._outstanding_queries[m] = (peerid, shnum, started)
3532+        for i in xrange(self._total_shares):
3533+            # So we don't have to do this later.
3534+            self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
3535 
3536hunk ./src/allmydata/mutable/retrieve.py 264
3537-        # ask the cache first
3538-        got_from_cache = False
3539-        datavs = []
3540-        for (offset, length) in readv:
3541-            (data, timestamp) = self._node._read_from_cache(self.verinfo, shnum,
3542-                                                            offset, length)
3543-            if data is not None:
3544-                datavs.append(data)
3545-        if len(datavs) == len(readv):
3546-            self.log("got data from cache")
3547-            got_from_cache = True
3548-            d = fireEventually({shnum: datavs})
3549-            # datavs is a dict mapping shnum to a pair of strings
3550-        else:
3551-            d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
3552-        self.remaining_sharemap.discard(shnum, peerid)
3553+        # If we have more than one segment, we are an SDMF file, which
3554+        # means that we need to validate the salts as we receive them.
3555+        self._salt_hash_tree = hashtree.IncompleteHashTree(self._num_segments)
3556+        self._salt_hash_tree[0] = IV # from the prefix.
3557 
3558hunk ./src/allmydata/mutable/retrieve.py 269
3559-        d.addCallback(self._got_results, m, peerid, started, got_from_cache)
3560-        d.addErrback(self._query_failed, m, peerid)
3561-        # errors that aren't handled by _query_failed (and errors caused by
3562-        # _query_failed) get logged, but we still want to check for doneness.
3563-        def _oops(f):
3564-            self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
3565-                     shnum=shnum,
3566-                     peerid=idlib.shortnodeid_b2a(peerid),
3567-                     failure=f,
3568-                     level=log.WEIRD, umid="W0xnQA")
3569-        d.addErrback(_oops)
3570-        d.addBoth(self._check_for_done)
3571-        # any error during _check_for_done means the download fails. If the
3572-        # download is successful, _check_for_done will fire _done by itself.
3573-        d.addErrback(self._done)
3574-        d.addErrback(log.err)
3575-        return d # purely for testing convenience
3576 
3577hunk ./src/allmydata/mutable/retrieve.py 270
3578-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
3579-        # isolate the callRemote to a separate method, so tests can subclass
3580-        # Publish and override it
3581-        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
3582-        return d
3583+    def _add_active_peers(self):
3584+        """
3585+        I populate self._active_readers with enough active readers to
3586+        retrieve the contents of this mutable file. I am called before
3587+        downloading starts, and (eventually) after each validation
3588+        error, connection error, or other problem in the download.
3589+        """
3590+        # TODO: It would be cool to investigate other heuristics for
3591+        # reader selection. For instance, the cost (in time the user
3592+        # spends waiting for their file) of selecting a really slow peer
3593+        # that happens to have a primary share is probably more than
3594+        # selecting a really fast peer that doesn't have a primary
3595+        # share. Maybe the servermap could be extended to provide this
3596+        # information; it could keep track of latency information while
3597+        # it gathers more important data, and then this routine could
3598+        # use that to select active readers.
3599+        #
3600+        # (these and other questions would be easier to answer with a
3601+        #  robust, configurable tahoe-lafs simulator, which modeled node
3602+        #  failures, differences in node speed, and other characteristics
3603+        #  that we expect storage servers to have.  You could have
3604+        #  presets for really stable grids (like allmydata.com),
3605+        #  friendnets, make it easy to configure your own settings, and
3606+        #  then simulate the effect of big changes on these use cases
3607+        #  instead of just reasoning about what the effect might be. Out
3608+        #  of scope for MDMF, though.)
3609 
3610hunk ./src/allmydata/mutable/retrieve.py 297
3611-    def remove_peer(self, peerid):
3612-        for shnum in list(self.remaining_sharemap.keys()):
3613-            self.remaining_sharemap.discard(shnum, peerid)
3614+        # We need at least self._required_shares readers to download a
3615+        # segment.
3616+        needed = self._required_shares - len(self._active_readers)
3617+        # XXX: Why don't format= log messages work here?
3618+        self.log("adding %d peers to the active peers list" % needed)
3619 
3620hunk ./src/allmydata/mutable/retrieve.py 303
3621-    def _got_results(self, datavs, marker, peerid, started, got_from_cache):
3622-        now = time.time()
3623-        elapsed = now - started
3624-        if not got_from_cache:
3625-            self._status.add_fetch_timing(peerid, elapsed)
3626-        self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
3627-                 shares=len(datavs),
3628-                 peerid=idlib.shortnodeid_b2a(peerid),
3629-                 level=log.NOISY)
3630-        self._outstanding_queries.pop(marker, None)
3631-        if not self._running:
3632-            return
3633+        # We favor lower numbered shares, since FEC is faster with
3634+        # primary shares than with other shares, and lower-numbered
3635+        # shares are more likely to be primary than higher numbered
3636+        # shares.
3637+        active_shnums = set(sorted(self.remaining_sharemap.keys()))
3638+        active_shnums = list(active_shnums)[:needed]
3639+        if len(active_shnums) < needed:
3640+            # We don't have enough readers to retrieve the file; fail.
3641+            return self._failed()
3642 
3643hunk ./src/allmydata/mutable/retrieve.py 313
3644-        # note that we only ask for a single share per query, so we only
3645-        # expect a single share back. On the other hand, we use the extra
3646-        # shares if we get them.. seems better than an assert().
3647+        for shnum in active_shnums:
3648+            self._active_readers.append(self.readers[shnum])
3649+            self.log("added reader for share %d" % shnum)
3650+        assert len(self._active_readers) == self._required_shares
3651+        # Conceptually, this is part of the _add_active_peers step. It
3652+        # validates the prefixes of newly added readers to make sure
3653+        # that they match what we are expecting for self.verinfo. If
3654+        # validation is successful, _validate_active_prefixes will call
3655+        # _download_current_segment for us. If validation is
3656+        # unsuccessful, then _validate_prefixes will remove the peer and
3657+        # call _add_active_peers again, where we will attempt to rectify
3658+        # the problem by choosing another peer.
3659+        return self._validate_active_prefixes()
3660 
3661hunk ./src/allmydata/mutable/retrieve.py 327
3662-        for shnum,datav in datavs.items():
3663-            (prefix, hash_and_data) = datav[:2]
3664-            try:
3665-                self._got_results_one_share(shnum, peerid,
3666-                                            prefix, hash_and_data)
3667-            except CorruptShareError, e:
3668-                # log it and give the other shares a chance to be processed
3669-                f = failure.Failure()
3670-                self.log(format="bad share: %(f_value)s",
3671-                         f_value=str(f.value), failure=f,
3672-                         level=log.WEIRD, umid="7fzWZw")
3673-                self.notify_server_corruption(peerid, shnum, str(e))
3674-                self.remove_peer(peerid)
3675-                self.servermap.mark_bad_share(peerid, shnum, prefix)
3676-                self._bad_shares.add( (peerid, shnum) )
3677-                self._status.problems[peerid] = f
3678-                self._last_failure = f
3679-                pass
3680-            if self._need_privkey and len(datav) > 2:
3681-                lp = None
3682-                self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
3683-        # all done!
3684 
3685hunk ./src/allmydata/mutable/retrieve.py 328
3686-    def notify_server_corruption(self, peerid, shnum, reason):
3687-        ss = self.servermap.connections[peerid]
3688-        ss.callRemoteOnly("advise_corrupt_share",
3689-                          "mutable", self._storage_index, shnum, reason)
3690+    def _validate_active_prefixes(self):
3691+        """
3692+        I check to make sure that the prefixes on the peers that I am
3693+        currently reading from match the prefix that we want to see, as
3694+        said in self.verinfo.
3695 
3696hunk ./src/allmydata/mutable/retrieve.py 334
3697-    def _got_results_one_share(self, shnum, peerid,
3698-                               got_prefix, got_hash_and_data):
3699-        self.log("_got_results: got shnum #%d from peerid %s"
3700-                 % (shnum, idlib.shortnodeid_b2a(peerid)))
3701-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3702+        If I find that all of the active peers have acceptable prefixes,
3703+        I pass control to _download_current_segment, which will use
3704+        those peers to do cool things. If I find that some of the active
3705+        peers have unacceptable prefixes, I will remove them from active
3706+        peers (and from further consideration) and call
3707+        _add_active_peers to attempt to rectify the situation. I keep
3708+        track of which peers I have already validated so that I don't
3709+        need to do so again.
3710+        """
3711+        assert self._active_readers, "No more active readers"
3712+
3713+        ds = []
3714+        new_readers = set(self._active_readers) - self._validated_readers
3715+        self.log('validating %d newly-added active readers' % len(new_readers))
3716+
3717+        for reader in new_readers:
3718+            # We force a remote read here -- otherwise, we are relying
3719+            # on cached data that we already verified as valid, and we
3720+            # won't detect an uncoordinated write that has occurred
3721+            # since the last servermap update.
3722+            d = reader.get_prefix(force_remote=True)
3723+            d.addCallback(self._try_to_validate_prefix, reader)
3724+            ds.append(d)
3725+        dl = defer.DeferredList(ds, consumeErrors=True)
3726+        def _check_results(results):
3727+            # Each result in results will be of the form (success, msg).
3728+            # We don't care about msg, but success will tell us whether
3729+            # or not the checkstring validated. If it didn't, we need to
3730+            # remove the offending (peer,share) from our active readers,
3731+            # and ensure that active readers is again populated.
3732+            bad_readers = []
3733+            for i, result in enumerate(results):
3734+                if not result[0]:
3735+                    reader = self._active_readers[i]
3736+                    f = result[1]
3737+                    assert isinstance(f, failure.Failure)
3738+
3739+                    self.log("The reader %s failed to "
3740+                             "properly validate: %s" % \
3741+                             (reader, str(f.value)))
3742+                    bad_readers.append((reader, f))
3743+                else:
3744+                    reader = self._active_readers[i]
3745+                    self.log("the reader %s checks out, so we'll use it" % \
3746+                             reader)
3747+                    self._validated_readers.add(reader)
3748+                    # Each time we validate a reader, we check to see if
3749+                    # we need the private key. If we do, we politely ask
3750+                    # for it and then continue computing. If we find
3751+                    # that we haven't gotten it at the end of
3752+                    # segment decoding, then we'll take more drastic
3753+                    # measures.
3754+                    if self._need_privkey:
3755+                        d = reader.get_encprivkey()
3756+                        d.addCallback(self._try_to_validate_privkey, reader)
3757+            if bad_readers:
3758+                # We do them all at once, or else we screw up list indexing.
3759+                for (reader, f) in bad_readers:
3760+                    self._mark_bad_share(reader, f)
3761+                return self._add_active_peers()
3762+            else:
3763+                return self._download_current_segment()
3764+            # The next step will assert that it has enough active
3765+            # readers to fetch shares; we just need to remove it.
3766+        dl.addCallback(_check_results)
3767+        return dl
3768+
3769+
3770+    def _try_to_validate_prefix(self, prefix, reader):
3771+        """
3772+        I check that the prefix returned by a candidate server for
3773+        retrieval matches the prefix that the servermap knows about
3774+        (and, hence, the prefix that was validated earlier). If it does,
3775+        I return True, which means that I approve of the use of the
3776+        candidate server for segment retrieval. If it doesn't, I return
3777+        False, which means that another server must be chosen.
3778+        """
3779+        (seqnum,
3780+         root_hash,
3781+         IV,
3782+         segsize,
3783+         datalength,
3784+         k,
3785+         N,
3786+         known_prefix,
3787          offsets_tuple) = self.verinfo
3788hunk ./src/allmydata/mutable/retrieve.py 420
3789-        assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
3790-        if got_prefix != prefix:
3791-            msg = "someone wrote to the data since we read the servermap: prefix changed"
3792-            raise UncoordinatedWriteError(msg)
3793-        (share_hash_chain, block_hash_tree,
3794-         share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
3795+        if known_prefix != prefix:
3796+            self.log("prefix from share %d doesn't match" % reader.shnum)
3797+            raise UncoordinatedWriteError("Mismatched prefix -- this could "
3798+                                          "indicate an uncoordinated write")
3799+        # Otherwise, we're okay -- no issues.
3800 
3801hunk ./src/allmydata/mutable/retrieve.py 426
3802-        assert isinstance(share_data, str)
3803-        # build the block hash tree. SDMF has only one leaf.
3804-        leaves = [hashutil.block_hash(share_data)]
3805-        t = hashtree.HashTree(leaves)
3806-        if list(t) != block_hash_tree:
3807-            raise CorruptShareError(peerid, shnum, "block hash tree failure")
3808-        share_hash_leaf = t[0]
3809-        t2 = hashtree.IncompleteHashTree(N)
3810-        # root_hash was checked by the signature
3811-        t2.set_hashes({0: root_hash})
3812-        try:
3813-            t2.set_hashes(hashes=share_hash_chain,
3814-                          leaves={shnum: share_hash_leaf})
3815-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
3816-                IndexError), e:
3817-            msg = "corrupt hashes: %s" % (e,)
3818-            raise CorruptShareError(peerid, shnum, msg)
3819-        self.log(" data valid! len=%d" % len(share_data))
3820-        # each query comes down to this: placing validated share data into
3821-        # self.shares
3822-        self.shares[shnum] = share_data
3823 
3824hunk ./src/allmydata/mutable/retrieve.py 427
3825-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
3826+    def _remove_reader(self, reader):
3827+        """
3828+        At various points, we will wish to remove a peer from
3829+        consideration and/or use. These include, but are not necessarily
3830+        limited to:
3831 
3832hunk ./src/allmydata/mutable/retrieve.py 433
3833-        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
3834-        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
3835-        if alleged_writekey != self._node.get_writekey():
3836-            self.log("invalid privkey from %s shnum %d" %
3837-                     (idlib.nodeid_b2a(peerid)[:8], shnum),
3838-                     parent=lp, level=log.WEIRD, umid="YIw4tA")
3839-            return
3840+            - A connection error.
3841+            - A mismatched prefix (that is, a prefix that does not match
3842+              our conception of the version information string).
3843+            - A failing block hash, salt hash, or share hash, which can
3844+              indicate disk failure/bit flips, or network trouble.
3845 
3846hunk ./src/allmydata/mutable/retrieve.py 439
3847-        # it's good
3848-        self.log("got valid privkey from shnum %d on peerid %s" %
3849-                 (shnum, idlib.shortnodeid_b2a(peerid)),
3850-                 parent=lp)
3851-        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
3852-        self._node._populate_encprivkey(enc_privkey)
3853-        self._node._populate_privkey(privkey)
3854-        self._need_privkey = False
3855+        This method will do that. I will make sure that the
3856+        (shnum,reader) combination represented by my reader argument is
3857+        not used for anything else during this download. I will not
3858+        advise the reader of any corruption, something that my callers
3859+        may wish to do on their own.
3860+        """
3861+        # TODO: When you're done writing this, see if this is ever
3862+        # actually used for something that _mark_bad_share isn't. I have
3863+        # a feeling that they will be used for very similar things, and
3864+        # that having them both here is just going to be an epic amount
3865+        # of code duplication.
3866+        #
3867+        # (well, okay, not epic, but meaningful)
3868+        self.log("removing reader %s" % reader)
3869+        # Remove the reader from _active_readers
3870+        self._active_readers.remove(reader)
3871+        # TODO: self.readers.remove(reader)?
3872+        for shnum in list(self.remaining_sharemap.keys()):
3873+            self.remaining_sharemap.discard(shnum, reader.peerid)
3874 
3875hunk ./src/allmydata/mutable/retrieve.py 459
3876-    def _query_failed(self, f, marker, peerid):
3877-        self.log(format="query to [%(peerid)s] failed",
3878-                 peerid=idlib.shortnodeid_b2a(peerid),
3879-                 level=log.NOISY)
3880-        self._status.problems[peerid] = f
3881-        self._outstanding_queries.pop(marker, None)
3882-        if not self._running:
3883-            return
3884-        self._last_failure = f
3885-        self.remove_peer(peerid)
3886-        level = log.WEIRD
3887-        if f.check(DeadReferenceError):
3888-            level = log.UNUSUAL
3889-        self.log(format="error during query: %(f_value)s",
3890-                 f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
3891 
3892hunk ./src/allmydata/mutable/retrieve.py 460
3893-    def _check_for_done(self, res):
3894-        # exit paths:
3895-        #  return : keep waiting, no new queries
3896-        #  return self._send_more_queries(outstanding) : send some more queries
3897-        #  fire self._done(plaintext) : download successful
3898-        #  raise exception : download fails
3899+    def _mark_bad_share(self, reader, f):
3900+        """
3901+        I mark the (peerid, shnum) encapsulated by my reader argument as
3902+        a bad share, which means that it will not be used anywhere else.
3903 
3904hunk ./src/allmydata/mutable/retrieve.py 465
3905-        self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
3906-                 running=self._running, decoding=self._decoding,
3907-                 level=log.NOISY)
3908-        if not self._running:
3909-            return
3910-        if self._decoding:
3911-            return
3912-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3913-         offsets_tuple) = self.verinfo
3914+        There are several reasons to want to mark something as a bad
3915+        share. These include:
3916 
3917hunk ./src/allmydata/mutable/retrieve.py 468
3918-        if len(self.shares) < k:
3919-            # we don't have enough shares yet
3920-            return self._maybe_send_more_queries(k)
3921-        if self._need_privkey:
3922-            # we got k shares, but none of them had a valid privkey. TODO:
3923-            # look further. Adding code to do this is a bit complicated, and
3924-            # I want to avoid that complication, and this should be pretty
3925-            # rare (k shares with bitflips in the enc_privkey but not in the
3926-            # data blocks). If we actually do get here, the subsequent repair
3927-            # will fail for lack of a privkey.
3928-            self.log("got k shares but still need_privkey, bummer",
3929-                     level=log.WEIRD, umid="MdRHPA")
3930+            - A connection error to the peer.
3931+            - A mismatched prefix (that is, a prefix that does not match
3932+              our local conception of the version information string).
3933+            - A failing block hash, salt hash, share hash, or other
3934+              integrity check.
3935 
3936hunk ./src/allmydata/mutable/retrieve.py 474
3937-        # we have enough to finish. All the shares have had their hashes
3938-        # checked, so if something fails at this point, we don't know how
3939-        # to fix it, so the download will fail.
3940+        This method will ensure that readers that we wish to mark bad
3941+        (for these reasons or other reasons) are not used for the rest
3942+        of the download. Additionally, it will attempt to tell the
3943+        remote peer (with no guarantee of success) that its share is
3944+        corrupt.
3945+        """
3946+        self.log("marking share %d on server %s as bad" % \
3947+                 (reader.shnum, reader))
3948+        self._remove_reader(reader)
3949+        self._bad_shares.add((reader.peerid, reader.shnum))
3950+        self._status.problems[reader.peerid] = f
3951+        self._last_failure = f
3952+        self.notify_server_corruption(reader.peerid, reader.shnum,
3953+                                      str(f.value))
3954 
3955hunk ./src/allmydata/mutable/retrieve.py 489
3956-        self._decoding = True # avoid reentrancy
3957-        self._status.set_status("decoding")
3958-        now = time.time()
3959-        elapsed = now - self._started
3960-        self._status.timings["fetch"] = elapsed
3961 
3962hunk ./src/allmydata/mutable/retrieve.py 490
3963-        d = defer.maybeDeferred(self._decode)
3964-        d.addCallback(self._decrypt, IV, self._node.get_readkey())
3965-        d.addBoth(self._done)
3966-        return d # purely for test convenience
3967+    def _download_current_segment(self):
3968+        """
3969+        I download, validate, decode, decrypt, and assemble the segment
3970+        that this Retrieve is currently responsible for downloading.
3971+        """
3972+        assert len(self._active_readers) >= self._required_shares
3973+        if self._current_segment < self._num_segments:
3974+            d = self._process_segment(self._current_segment)
3975+        else:
3976+            d = defer.succeed(None)
3977+        d.addCallback(self._check_for_done)
3978+        return d
3979 
3980hunk ./src/allmydata/mutable/retrieve.py 503
3981-    def _maybe_send_more_queries(self, k):
3982-        # we don't have enough shares yet. Should we send out more queries?
3983-        # There are some number of queries outstanding, each for a single
3984-        # share. If we can generate 'needed_shares' additional queries, we do
3985-        # so. If we can't, then we know this file is a goner, and we raise
3986-        # NotEnoughSharesError.
3987-        self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
3988-                         "outstanding=%(outstanding)d"),
3989-                 have=len(self.shares), k=k,
3990-                 outstanding=len(self._outstanding_queries),
3991-                 level=log.NOISY)
3992 
3993hunk ./src/allmydata/mutable/retrieve.py 504
3994-        remaining_shares = k - len(self.shares)
3995-        needed = remaining_shares - len(self._outstanding_queries)
3996-        if not needed:
3997-            # we have enough queries in flight already
3998+    def _process_segment(self, segnum):
3999+        """
4000+        I download, validate, decode, and decrypt one segment of the
4001+        file that this Retrieve is retrieving. This means coordinating
4002+        the process of getting k blocks of that file, validating them,
4003+        assembling them into one segment with the decoder, and then
4004+        decrypting them.
4005+        """
4006+        self.log("processing segment %d" % segnum)
4007 
4008hunk ./src/allmydata/mutable/retrieve.py 514
4009-            # TODO: but if they've been in flight for a long time, and we
4010-            # have reason to believe that new queries might respond faster
4011-            # (i.e. we've seen other queries come back faster, then consider
4012-            # sending out new queries. This could help with peers which have
4013-            # silently gone away since the servermap was updated, for which
4014-            # we're still waiting for the 15-minute TCP disconnect to happen.
4015-            self.log("enough queries are in flight, no more are needed",
4016-                     level=log.NOISY)
4017-            return
4018+        # TODO: The old code uses a marker. Should this code do that
4019+        # too? What did the Marker do?
4020+        assert len(self._active_readers) >= self._required_shares
4021+
4022+        # We need to ask each of our active readers for its block and
4023+        # salt. We will then validate those. If validation is
4024+        # successful, we will assemble the results into plaintext.
4025+        ds = []
4026+        for reader in self._active_readers:
4027+            d = reader.get_block_and_salt(segnum, queue=True)
4028+            d2 = self._get_needed_hashes(reader, segnum)
4029+            dl = defer.DeferredList([d, d2])
4030+            dl.addCallback(self._validate_block, segnum, reader)
4031+            dl.addErrback(self._validation_or_decoding_failed, [reader])
4032+            ds.append(dl)
4033+            reader.flush()
4034+        dl = defer.DeferredList(ds)
4035+        dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
4036+        return dl
4037 
4038hunk ./src/allmydata/mutable/retrieve.py 534
4039-        outstanding_shnums = set([shnum
4040-                                  for (peerid, shnum, started)
4041-                                  in self._outstanding_queries.values()])
4042-        # prefer low-numbered shares, they are more likely to be primary
4043-        available_shnums = sorted(self.remaining_sharemap.keys())
4044-        for shnum in available_shnums:
4045-            if shnum in outstanding_shnums:
4046-                # skip ones that are already in transit
4047-                continue
4048-            if shnum not in self.remaining_sharemap:
4049-                # no servers for that shnum. note that DictOfSets removes
4050-                # empty sets from the dict for us.
4051-                continue
4052-            peerid = list(self.remaining_sharemap[shnum])[0]
4053-            # get_data will remove that peerid from the sharemap, and add the
4054-            # query to self._outstanding_queries
4055-            self._status.set_status("Retrieving More Shares")
4056-            self.get_data(shnum, peerid)
4057-            needed -= 1
4058-            if not needed:
4059+
4060+    def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
4061+        """
4062+        I take the results of fetching and validating the blocks from a
4063+        callback chain in another method. If the results are such that
4064+        they tell me that validation and fetching succeeded without
4065+        incident, I will proceed with decoding and decryption.
4066+        Otherwise, I will do nothing.
4067+        """
4068+        self.log("trying to decode and decrypt segment %d" % segnum)
4069+        failures = False
4070+        for block_and_salt in blocks_and_salts:
4071+            if not block_and_salt[0] or block_and_salt[1] == None:
4072+                self.log("some validation operations failed; not proceeding")
4073+                failures = True
4074                 break
4075hunk ./src/allmydata/mutable/retrieve.py 550
4076+        if not failures:
4077+            self.log("everything looks ok, building segment %d" % segnum)
4078+            d = self._decode_blocks(blocks_and_salts, segnum)
4079+            d.addCallback(self._decrypt_segment)
4080+            d.addErrback(self._validation_or_decoding_failed,
4081+                         self._active_readers)
4082+            d.addCallback(self._set_segment)
4083+            return d
4084+        else:
4085+            return defer.succeed(None)
4086+
4087+
4088+    def _set_segment(self, segment):
4089+        """
4090+        Given a plaintext segment, I register that segment with the
4091+        target that is handling the file download.
4092+        """
4093+        self.log("got plaintext for segment %d" % self._current_segment)
4094+        self._plaintext += segment
4095+        self._current_segment += 1
4096 
4097hunk ./src/allmydata/mutable/retrieve.py 571
4098-        # at this point, we have as many outstanding queries as we can. If
4099-        # needed!=0 then we might not have enough to recover the file.
4100-        if needed:
4101-            format = ("ran out of peers: "
4102-                      "have %(have)d shares (k=%(k)d), "
4103-                      "%(outstanding)d queries in flight, "
4104-                      "need %(need)d more, "
4105-                      "found %(bad)d bad shares")
4106-            args = {"have": len(self.shares),
4107-                    "k": k,
4108-                    "outstanding": len(self._outstanding_queries),
4109-                    "need": needed,
4110-                    "bad": len(self._bad_shares),
4111-                    }
4112-            self.log(format=format,
4113-                     level=log.WEIRD, umid="ezTfjw", **args)
4114-            err = NotEnoughSharesError("%s, last failure: %s" %
4115-                                      (format % args, self._last_failure))
4116-            if self._bad_shares:
4117-                self.log("We found some bad shares this pass. You should "
4118-                         "update the servermap and try again to check "
4119-                         "more peers",
4120-                         level=log.WEIRD, umid="EFkOlA")
4121-                err.servermap = self.servermap
4122-            raise err
4123 
4124hunk ./src/allmydata/mutable/retrieve.py 572
4125+    def _validation_or_decoding_failed(self, f, readers):
4126+        """
4127+        I am called when a block or a salt fails to correctly validate, or when
4128+        the decryption or decoding operation fails for some reason.  I react to
4129+        this failure by notifying the remote server of corruption, and then
4130+        removing the remote peer from further activity.
4131+        """
4132+        assert isinstance(readers, list)
4133+        bad_shnums = [reader.shnum for reader in readers]
4134+
4135+        self.log("validation or decoding failed on share(s) %s, peer(s) %s "
4136+                 ", segment %d: %s" % \
4137+                 (bad_shnums, readers, self._current_segment, str(f)))
4138+        for reader in readers:
4139+            self._mark_bad_share(reader, f)
4140         return
4141 
4142hunk ./src/allmydata/mutable/retrieve.py 589
4143-    def _decode(self):
4144-        started = time.time()
4145-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4146-         offsets_tuple) = self.verinfo
4147 
4148hunk ./src/allmydata/mutable/retrieve.py 590
4149-        # shares_dict is a dict mapping shnum to share data, but the codec
4150-        # wants two lists.
4151-        shareids = []; shares = []
4152-        for shareid, share in self.shares.items():
4153+    def _validate_block(self, results, segnum, reader):
4154+        """
4155+        I validate a block from one share on a remote server.
4156+        """
4157+        # Grab the part of the block hash tree that is necessary to
4158+        # validate this block, then generate the block hash root.
4159+        self.log("validating share %d for segment %d" % (reader.shnum,
4160+                                                             segnum))
4161+        # Did we fail to fetch either of the things that we were
4162+        # supposed to? Fail if so.
4163+        if not results[0][0] and results[1][0]:
4164+            # handled by the errback handler.
4165+            raise CorruptShareError("Connection error")
4166+
4167+        block_and_salt, block_and_sharehashes = results
4168+        block, salt = block_and_salt[1]
4169+        blockhashes, sharehashes = block_and_sharehashes[1]
4170+
4171+        blockhashes = dict(enumerate(blockhashes[1]))
4172+        self.log("the reader gave me the following blockhashes: %s" % \
4173+                 blockhashes.keys())
4174+        self.log("the reader gave me the following sharehashes: %s" % \
4175+                 sharehashes[1].keys())
4176+        bht = self._block_hash_trees[reader.shnum]
4177+
4178+        if bht.needed_hashes(segnum, include_leaf=True):
4179+            try:
4180+                bht.set_hashes(blockhashes)
4181+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
4182+                    IndexError), e:
4183+                raise CorruptShareError(reader.peerid,
4184+                                        reader.shnum,
4185+                                        "block hash tree failure: %s" % e)
4186+
4187+        blockhash = hashutil.block_hash(block)
4188+        # If this works without an error, then validation is
4189+        # successful.
4190+        try:
4191+           bht.set_hashes(leaves={segnum: blockhash})
4192+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
4193+                IndexError), e:
4194+            raise CorruptShareError(reader.peerid,
4195+                                    reader.shnum,
4196+                                    "block hash tree failure: %s" % e)
4197+
4198+        # Reaching this point means that we know that this segment
4199+        # is correct. Now we need to check to see whether the share
4200+        # hash chain is also correct.
4201+        # SDMF wrote share hash chains that didn't contain the
4202+        # leaves, which would be produced from the block hash tree.
4203+        # So we need to validate the block hash tree first. If
4204+        # successful, then bht[0] will contain the root for the
4205+        # shnum, which will be a leaf in the share hash tree, which
4206+        # will allow us to validate the rest of the tree.
4207+        if self.share_hash_tree.needed_hashes(reader.shnum,
4208+                                               include_leaf=True):
4209+            try:
4210+                self.share_hash_tree.set_hashes(hashes=sharehashes[1],
4211+                                            leaves={reader.shnum: bht[0]})
4212+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
4213+                    IndexError), e:
4214+                raise CorruptShareError(reader.peerid,
4215+                                        reader.shnum,
4216+                                        "corrupt hashes: %s" % e)
4217+
4218+        # TODO: Validate the salt, too.
4219+        self.log('share %d is valid for segment %d' % (reader.shnum,
4220+                                                       segnum))
4221+        return {reader.shnum: (block, salt)}
4222+
4223+
4224+    def _get_needed_hashes(self, reader, segnum):
4225+        """
4226+        I get the hashes needed to validate segnum from the reader, then return
4227+        to my caller when this is done.
4228+        """
4229+        bht = self._block_hash_trees[reader.shnum]
4230+        needed = bht.needed_hashes(segnum, include_leaf=True)
4231+        # The root of the block hash tree is also a leaf in the share
4232+        # hash tree. So we don't need to fetch it from the remote
4233+        # server. In the case of files with one segment, this means that
4234+        # we won't fetch any block hash tree from the remote server,
4235+        # since the hash of each share of the file is the entire block
4236+        # hash tree, and is a leaf in the share hash tree. This is fine,
4237+        # since any share corruption will be detected in the share hash
4238+        # tree.
4239+        #needed.discard(0)
4240+        self.log("getting blockhashes for segment %d, share %d: %s" % \
4241+                 (segnum, reader.shnum, str(needed)))
4242+        d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
4243+        if self.share_hash_tree.needed_hashes(reader.shnum):
4244+            need = self.share_hash_tree.needed_hashes(reader.shnum)
4245+            self.log("also need sharehashes for share %d: %s" % (reader.shnum,
4246+                                                                 str(need)))
4247+            d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
4248+        else:
4249+            d2 = defer.succeed({}) # the logic in the next method
4250+                                   # expects a dict
4251+        dl = defer.DeferredList([d1, d2])
4252+        return dl
4253+
4254+
4255+    def _decode_blocks(self, blocks_and_salts, segnum):
4256+        """
4257+        I take a list of k blocks and salts, and decode that into a
4258+        single encrypted segment.
4259+        """
4260+        d = {}
4261+        # We want to merge our dictionaries to the form
4262+        # {shnum: blocks_and_salts}
4263+        #
4264+        # The dictionaries come from validate block that way, so we just
4265+        # need to merge them.
4266+        for block_and_salt in blocks_and_salts:
4267+            d.update(block_and_salt[1])
4268+
4269+        # All of these blocks should have the same salt; in SDMF, it is
4270+        # the file-wide IV, while in MDMF it is the per-segment salt. In
4271+        # either case, we just need to get one of them and use it.
4272+        #
4273+        # d.items()[0] is like (shnum, (block, salt))
4274+        # d.items()[0][1] is like (block, salt)
4275+        # d.items()[0][1][1] is the salt.
4276+        salt = d.items()[0][1][1]
4277+        # Next, extract just the blocks from the dict. We'll use the
4278+        # salt in the next step.
4279+        share_and_shareids = [(k, v[0]) for k, v in d.items()]
4280+        d2 = dict(share_and_shareids)
4281+        shareids = []
4282+        shares = []
4283+        for shareid, share in d2.items():
4284             shareids.append(shareid)
4285             shares.append(share)
4286 
4287hunk ./src/allmydata/mutable/retrieve.py 724
4288-        assert len(shareids) >= k, len(shareids)
4289+        assert len(shareids) >= self._required_shares, len(shareids)
4290         # zfec really doesn't want extra shares
4291hunk ./src/allmydata/mutable/retrieve.py 726
4292-        shareids = shareids[:k]
4293-        shares = shares[:k]
4294-
4295-        fec = codec.CRSDecoder()
4296-        fec.set_params(segsize, k, N)
4297-
4298-        self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
4299-        self.log("about to decode, shareids=%s" % (shareids,))
4300-        d = defer.maybeDeferred(fec.decode, shares, shareids)
4301-        def _done(buffers):
4302-            self._status.timings["decode"] = time.time() - started
4303-            self.log(" decode done, %d buffers" % len(buffers))
4304+        shareids = shareids[:self._required_shares]
4305+        shares = shares[:self._required_shares]
4306+        self.log("decoding segment %d" % segnum)
4307+        if segnum == self._num_segments - 1:
4308+            d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
4309+        else:
4310+            d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
4311+        def _process(buffers):
4312             segment = "".join(buffers)
4313hunk ./src/allmydata/mutable/retrieve.py 735
4314+            self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
4315+                     segnum=segnum,
4316+                     numsegs=self._num_segments,
4317+                     level=log.NOISY)
4318             self.log(" joined length %d, datalength %d" %
4319hunk ./src/allmydata/mutable/retrieve.py 740
4320-                     (len(segment), datalength))
4321-            segment = segment[:datalength]
4322+                     (len(segment), self._data_length))
4323+            if segnum == self._num_segments - 1:
4324+                size_to_use = self._tail_data_size
4325+            else:
4326+                size_to_use = self._segment_size
4327+            segment = segment[:size_to_use]
4328             self.log(" segment len=%d" % len(segment))
4329hunk ./src/allmydata/mutable/retrieve.py 747
4330-            return segment
4331-        def _err(f):
4332-            self.log(" decode failed: %s" % f)
4333-            return f
4334-        d.addCallback(_done)
4335-        d.addErrback(_err)
4336+            return segment, salt
4337+        d.addCallback(_process)
4338         return d
4339 
4340hunk ./src/allmydata/mutable/retrieve.py 751
4341-    def _decrypt(self, crypttext, IV, readkey):
4342+
4343+    def _decrypt_segment(self, segment_and_salt):
4344+        """
4345+        I take a single segment and its salt, and decrypt it. I return
4346+        the plaintext of the segment that is in my argument.
4347+        """
4348+        segment, salt = segment_and_salt
4349         self._status.set_status("decrypting")
4350hunk ./src/allmydata/mutable/retrieve.py 759
4351+        self.log("decrypting segment %d" % self._current_segment)
4352         started = time.time()
4353hunk ./src/allmydata/mutable/retrieve.py 761
4354-        key = hashutil.ssk_readkey_data_hash(IV, readkey)
4355+        key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
4356         decryptor = AES(key)
4357hunk ./src/allmydata/mutable/retrieve.py 763
4358-        plaintext = decryptor.process(crypttext)
4359+        plaintext = decryptor.process(segment)
4360         self._status.timings["decrypt"] = time.time() - started
4361         return plaintext
4362 
4363hunk ./src/allmydata/mutable/retrieve.py 767
4364-    def _done(self, res):
4365-        if not self._running:
4366+
4367+    def notify_server_corruption(self, peerid, shnum, reason):
4368+        ss = self.servermap.connections[peerid]
4369+        ss.callRemoteOnly("advise_corrupt_share",
4370+                          "mutable", self._storage_index, shnum, reason)
4371+
4372+
4373+    def _try_to_validate_privkey(self, enc_privkey, reader):
4374+
4375+        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
4376+        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
4377+        if alleged_writekey != self._node.get_writekey():
4378+            self.log("invalid privkey from %s shnum %d" %
4379+                     (reader, reader.shnum),
4380+                     level=log.WEIRD, umid="YIw4tA")
4381             return
4382hunk ./src/allmydata/mutable/retrieve.py 783
4383-        self._running = False
4384-        self._status.set_active(False)
4385-        self._status.timings["total"] = time.time() - self._started
4386-        # res is either the new contents, or a Failure
4387-        if isinstance(res, failure.Failure):
4388-            self.log("Retrieve done, with failure", failure=res,
4389-                     level=log.UNUSUAL)
4390-            self._status.set_status("Failed")
4391-        else:
4392-            self.log("Retrieve done, success!")
4393-            self._status.set_status("Finished")
4394-            self._status.set_progress(1.0)
4395-            # remember the encoding parameters, use them again next time
4396-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4397-             offsets_tuple) = self.verinfo
4398-            self._node._populate_required_shares(k)
4399-            self._node._populate_total_shares(N)
4400-        eventually(self._done_deferred.callback, res)
4401 
4402hunk ./src/allmydata/mutable/retrieve.py 784
4403+        # it's good
4404+        self.log("got valid privkey from shnum %d on reader %s" %
4405+                 (reader.shnum, reader))
4406+        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
4407+        self._node._populate_encprivkey(enc_privkey)
4408+        self._node._populate_privkey(privkey)
4409+        self._need_privkey = False
4410+
4411+
4412+    def _check_for_done(self, res):
4413+        """
4414+        I check to see if this Retrieve object has successfully finished
4415+        its work.
4416+
4417+        I can exit in the following ways:
4418+            - If there are no more segments to download, then I exit by
4419+              causing self._done_deferred to fire with the plaintext
4420+              content requested by the caller.
4421+            - If there are still segments to be downloaded, and there
4422+              are enough active readers (readers which have not broken
4423+              and have not given us corrupt data) to continue
4424+              downloading, I send control back to
4425+              _download_current_segment.
4426+            - If there are still segments to be downloaded but there are
4427+              not enough active peers to download them, I ask
4428+              _add_active_peers to add more peers. If it is successful,
4429+              it will call _download_current_segment. If there are not
4430+              enough peers to retrieve the file, then that will cause
4431+              _done_deferred to errback.
4432+        """
4433+        self.log("checking for doneness")
4434+        if self._current_segment == self._num_segments:
4435+            # No more segments to download, we're done.
4436+            self.log("got plaintext, done")
4437+            return self._done()
4438+
4439+        if len(self._active_readers) >= self._required_shares:
4440+            # More segments to download, but we have enough good peers
4441+            # in self._active_readers that we can do that without issue,
4442+            # so go nab the next segment.
4443+            self.log("not done yet: on segment %d of %d" % \
4444+                     (self._current_segment + 1, self._num_segments))
4445+            return self._download_current_segment()
4446+
4447+        self.log("not done yet: on segment %d of %d, need to add peers" % \
4448+                 (self._current_segment + 1, self._num_segments))
4449+        return self._add_active_peers()
4450+
4451+
4452+    def _done(self):
4453+        """
4454+        I am called by _check_for_done when the download process has
4455+        finished successfully. After making some useful logging
4456+        statements, I return the decrypted contents to the owner of this
4457+        Retrieve object through self._done_deferred.
4458+        """
4459+        eventually(self._done_deferred.callback, self._plaintext)
4460+
4461+
4462+    def _failed(self):
4463+        """
4464+        I am called by _add_active_peers when there are not enough
4465+        active peers left to complete the download. After making some
4466+        useful logging statements, I return an exception to that effect
4467+        to the caller of this Retrieve object through
4468+        self._done_deferred.
4469+        """
4470+        format = ("ran out of peers: "
4471+                  "have %(have)d of %(total)d segments "
4472+                  "found %(bad)d bad shares "
4473+                  "encoding %(k)d-of-%(n)d")
4474+        args = {"have": self._current_segment,
4475+                "total": self._num_segments,
4476+                "k": self._required_shares,
4477+                "n": self._total_shares,
4478+                "bad": len(self._bad_shares)}
4479+        e = NotEnoughSharesError("%s, last failure: %s" % (format % args,
4480+                                                        str(self._last_failure)))
4481+        f = failure.Failure(e)
4482+        eventually(self._done_deferred.callback, f)
4483hunk ./src/allmydata/test/test_mutable.py 12
4484 from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
4485      ssk_pubkey_fingerprint_hash
4486 from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
4487-     NotEnoughSharesError
4488+     NotEnoughSharesError, SDMF_VERSION, MDMF_VERSION
4489 from allmydata.monitor import Monitor
4490 from allmydata.test.common import ShouldFailMixin
4491 from allmydata.test.no_network import GridTestMixin
4492hunk ./src/allmydata/test/test_mutable.py 103
4493         d = fireEventually()
4494         d.addCallback(lambda res: _call())
4495         return d
4496+
4497     def callRemoteOnly(self, methname, *args, **kwargs):
4498         d = self.callRemote(methname, *args, **kwargs)
4499         d.addBoth(lambda ignore: None)
4500hunk ./src/allmydata/test/test_mutable.py 286
4501         d.addCallback(_created)
4502         return d
4503 
4504+
4505+    def test_upload_and_download_mdmf(self):
4506+        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
4507+        def _created(n):
4508+            d = defer.succeed(None)
4509+            d.addCallback(lambda ignored:
4510+                n.get_servermap(MODE_READ))
4511+            def _then(servermap):
4512+                dumped = servermap.dump(StringIO())
4513+                self.failUnlessIn("3-of-10", dumped.getvalue())
4514+            d.addCallback(_then)
4515+            # Now overwrite the contents with some new contents. We want
4516+            # to make them big enough to force the file to be uploaded
4517+            # in more than one segment.
4518+            big_contents = "contents1" * 100000 # about 900 KiB
4519+            d.addCallback(lambda ignored:
4520+                n.overwrite(big_contents))
4521+            d.addCallback(lambda ignored:
4522+                n.download_best_version())
4523+            d.addCallback(lambda data:
4524+                self.failUnlessEqual(data, big_contents))
4525+            # Overwrite the contents again with some new contents. As
4526+            # before, they need to be big enough to force multiple
4527+            # segments, so that we make the downloader deal with
4528+            # multiple segments.
4529+            bigger_contents = "contents2" * 1000000 # about 9MiB
4530+            d.addCallback(lambda ignored:
4531+                n.overwrite(bigger_contents))
4532+            d.addCallback(lambda ignored:
4533+                n.download_best_version())
4534+            d.addCallback(lambda data:
4535+                self.failUnlessEqual(data, bigger_contents))
4536+            return d
4537+        d.addCallback(_created)
4538+        return d
4539+
4540+
4541     def test_create_with_initial_contents(self):
4542         d = self.nodemaker.create_mutable_file("contents 1")
4543         def _created(n):
4544hunk ./src/allmydata/test/test_mutable.py 1006
4545     test_no_servers_download.timeout = 15
4546 
4547     def _test_corrupt_all(self, offset, substring,
4548-                          should_succeed=False, corrupt_early=True,
4549-                          failure_checker=None):
4550+                          should_succeed=False,
4551+                          corrupt_early=True,
4552+                          failure_checker=None,
4553+                          fetch_privkey=False):
4554         d = defer.succeed(None)
4555         if corrupt_early:
4556             d.addCallback(corrupt, self._storage, offset)
4557hunk ./src/allmydata/test/test_mutable.py 1026
4558                     self.failUnlessIn(substring, "".join(allproblems))
4559                 return servermap
4560             if should_succeed:
4561-                d1 = self._fn.download_version(servermap, ver)
4562+                d1 = self._fn.download_version(servermap, ver,
4563+                                               fetch_privkey)
4564                 d1.addCallback(lambda new_contents:
4565                                self.failUnlessEqual(new_contents, self.CONTENTS))
4566             else:
4567hunk ./src/allmydata/test/test_mutable.py 1034
4568                 d1 = self.shouldFail(NotEnoughSharesError,
4569                                      "_corrupt_all(offset=%s)" % (offset,),
4570                                      substring,
4571-                                     self._fn.download_version, servermap, ver)
4572+                                     self._fn.download_version, servermap,
4573+                                                                ver,
4574+                                                                fetch_privkey)
4575             if failure_checker:
4576                 d1.addCallback(failure_checker)
4577             d1.addCallback(lambda res: servermap)
4578hunk ./src/allmydata/test/test_mutable.py 1045
4579         return d
4580 
4581     def test_corrupt_all_verbyte(self):
4582-        # when the version byte is not 0, we hit an UnknownVersionError error
4583-        # in unpack_share().
4584+        # when the version byte is not 0 or 1, we hit an UnknownVersionError
4585+        # error in unpack_share().
4586         d = self._test_corrupt_all(0, "UnknownVersionError")
4587         def _check_servermap(servermap):
4588             # and the dump should mention the problems
4589hunk ./src/allmydata/test/test_mutable.py 1052
4590             s = StringIO()
4591             dump = servermap.dump(s).getvalue()
4592-            self.failUnless("10 PROBLEMS" in dump, dump)
4593+            self.failUnless("30 PROBLEMS" in dump, dump)
4594         d.addCallback(_check_servermap)
4595         return d
4596 
4597hunk ./src/allmydata/test/test_mutable.py 1122
4598         return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
4599 
4600 
4601+    def test_corrupt_all_encprivkey_late(self):
4602+        # this should work for the same reason as above, but we corrupt
4603+        # after the servermap update to exercise the error handling
4604+        # code.
4605+        # We need to remove the privkey from the node, or the retrieve
4606+        # process won't know to update it.
4607+        self._fn._privkey = None
4608+        return self._test_corrupt_all("enc_privkey",
4609+                                      None, # this shouldn't fail
4610+                                      should_succeed=True,
4611+                                      corrupt_early=False,
4612+                                      fetch_privkey=True)
4613+
4614+
4615     def test_corrupt_all_seqnum_late(self):
4616         # corrupting the seqnum between mapupdate and retrieve should result
4617         # in NotEnoughSharesError, since each share will look invalid
4618hunk ./src/allmydata/test/test_mutable.py 1142
4619         def _check(res):
4620             f = res[0]
4621             self.failUnless(f.check(NotEnoughSharesError))
4622-            self.failUnless("someone wrote to the data since we read the servermap" in str(f))
4623+            self.failUnless("uncoordinated write" in str(f))
4624         return self._test_corrupt_all(1, "ran out of peers",
4625                                       corrupt_early=False,
4626                                       failure_checker=_check)
4627hunk ./src/allmydata/test/test_mutable.py 1916
4628             d.addCallback(lambda res:
4629                           self.shouldFail(NotEnoughSharesError,
4630                                           "test_retrieve_surprise",
4631-                                          "ran out of peers: have 0 shares (k=3)",
4632+                                          "ran out of peers: have 0 of 1",
4633                                           n.download_version,
4634                                           self.old_map,
4635                                           self.old_map.best_recoverable_version(),
4636hunk ./src/allmydata/test/test_mutable.py 1925
4637         d.addCallback(_created)
4638         return d
4639 
4640+
4641     def test_unexpected_shares(self):
4642         # upload the file, take a servermap, shut down one of the servers,
4643         # upload it again (causing shares to appear on a new server), then
4644hunk ./src/allmydata/test/test_mutable.py 2129
4645         self.basedir = "mutable/Problems/test_privkey_query_missing"
4646         self.set_up_grid(num_servers=20)
4647         nm = self.g.clients[0].nodemaker
4648-        LARGE = "These are Larger contents" * 2000 # about 50KB
4649+        LARGE = "These are Larger contents" * 2000 # about 50KiB
4650         nm._node_cache = DevNullDictionary() # disable the nodecache
4651 
4652         d = nm.create_mutable_file(LARGE)
4653}
4654[Alter the ServermapUpdater to find MDMF files
4655Kevan Carstensen <kevan@isnotajoke.com>**20100624234208
4656 Ignore-this: 8f89a4f853bc3096990cddf0e0644813
4657 
4658 The servermapupdater should find MDMF files on a grid in the same way
4659 that it finds SDMF files. This patch makes it do that.
4660] {
4661hunk ./src/allmydata/mutable/servermap.py 7
4662 from itertools import count
4663 from twisted.internet import defer
4664 from twisted.python import failure
4665-from foolscap.api import DeadReferenceError, RemoteException, eventually
4666+from foolscap.api import DeadReferenceError, RemoteException, eventually, \
4667+                         fireEventually
4668 from allmydata.util import base32, hashutil, idlib, log
4669 from allmydata.storage.server import si_b2a
4670 from allmydata.interfaces import IServermapUpdaterStatus
4671hunk ./src/allmydata/mutable/servermap.py 17
4672 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
4673      DictOfSets, CorruptShareError, NeedMoreDataError
4674 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
4675-     SIGNED_PREFIX_LENGTH
4676+     SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
4677 
4678 class UpdateStatus:
4679     implements(IServermapUpdaterStatus)
4680hunk ./src/allmydata/mutable/servermap.py 254
4681         """Return a set of versionids, one for each version that is currently
4682         recoverable."""
4683         versionmap = self.make_versionmap()
4684-
4685         recoverable_versions = set()
4686         for (verinfo, shares) in versionmap.items():
4687             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4688hunk ./src/allmydata/mutable/servermap.py 366
4689         self._servers_responded = set()
4690 
4691         # how much data should we read?
4692+        # SDMF:
4693         #  * if we only need the checkstring, then [0:75]
4694         #  * if we need to validate the checkstring sig, then [543ish:799ish]
4695         #  * if we need the verification key, then [107:436ish]
4696hunk ./src/allmydata/mutable/servermap.py 374
4697         #  * if we need the encrypted private key, we want [-1216ish:]
4698         #   * but we can't read from negative offsets
4699         #   * the offset table tells us the 'ish', also the positive offset
4700-        # A future version of the SMDF slot format should consider using
4701-        # fixed-size slots so we can retrieve less data. For now, we'll just
4702-        # read 2000 bytes, which also happens to read enough actual data to
4703-        # pre-fetch a 9-entry dirnode.
4704+        # MDMF:
4705+        #  * Checkstring? [0:72]
4706+        #  * If we want to validate the checkstring, then [0:72], [143:?] --
4707+        #    the offset table will tell us for sure.
4708+        #  * If we need the verification key, we have to consult the offset
4709+        #    table as well.
4710+        # At this point, we don't know which we are. Our filenode can
4711+        # tell us, but it might be lying -- in some cases, we're
4712+        # responsible for telling it which kind of file it is.
4713         self._read_size = 4000
4714         if mode == MODE_CHECK:
4715             # we use unpack_prefix_and_signature, so we need 1k
4716hunk ./src/allmydata/mutable/servermap.py 432
4717         self._queries_completed = 0
4718 
4719         sb = self._storage_broker
4720+        # All of the peers, permuted by the storage index, as usual.
4721         full_peerlist = sb.get_servers_for_index(self._storage_index)
4722         self.full_peerlist = full_peerlist # for use later, immutable
4723         self.extra_peers = full_peerlist[:] # peers are removed as we use them
4724hunk ./src/allmydata/mutable/servermap.py 439
4725         self._good_peers = set() # peers who had some shares
4726         self._empty_peers = set() # peers who don't have any shares
4727         self._bad_peers = set() # peers to whom our queries failed
4728+        self._readers = {} # peerid -> dict(sharewriters), filled in
4729+                           # after responses come in.
4730 
4731         k = self._node.get_required_shares()
4732hunk ./src/allmydata/mutable/servermap.py 443
4733+        # For what cases can these conditions work?
4734         if k is None:
4735             # make a guess
4736             k = 3
4737hunk ./src/allmydata/mutable/servermap.py 456
4738         self.num_peers_to_query = k + self.EPSILON
4739 
4740         if self.mode == MODE_CHECK:
4741+            # We want to query all of the peers.
4742             initial_peers_to_query = dict(full_peerlist)
4743             must_query = set(initial_peers_to_query.keys())
4744             self.extra_peers = []
4745hunk ./src/allmydata/mutable/servermap.py 464
4746             # we're planning to replace all the shares, so we want a good
4747             # chance of finding them all. We will keep searching until we've
4748             # seen epsilon that don't have a share.
4749+            # We don't query all of the peers because that could take a while.
4750             self.num_peers_to_query = N + self.EPSILON
4751             initial_peers_to_query, must_query = self._build_initial_querylist()
4752             self.required_num_empty_peers = self.EPSILON
4753hunk ./src/allmydata/mutable/servermap.py 474
4754             # might also avoid the round trip required to read the encrypted
4755             # private key.
4756 
4757-        else:
4758+        else: # MODE_READ, MODE_ANYTHING
4759+            # 2k peers is good enough.
4760             initial_peers_to_query, must_query = self._build_initial_querylist()
4761 
4762         # this is a set of peers that we are required to get responses from:
4763hunk ./src/allmydata/mutable/servermap.py 490
4764         # before we can consider ourselves finished, and self.extra_peers
4765         # contains the overflow (peers that we should tap if we don't get
4766         # enough responses)
4767+        # I guess that self._must_query is a subset of
4768+        # initial_peers_to_query?
4769+        assert set(must_query).issubset(set(initial_peers_to_query))
4770 
4771         self._send_initial_requests(initial_peers_to_query)
4772         self._status.timings["initial_queries"] = time.time() - self._started
4773hunk ./src/allmydata/mutable/servermap.py 549
4774         # errors that aren't handled by _query_failed (and errors caused by
4775         # _query_failed) get logged, but we still want to check for doneness.
4776         d.addErrback(log.err)
4777-        d.addBoth(self._check_for_done)
4778         d.addErrback(self._fatal_error)
4779hunk ./src/allmydata/mutable/servermap.py 550
4780+        d.addCallback(self._check_for_done)
4781         return d
4782 
4783     def _do_read(self, ss, peerid, storage_index, shnums, readv):
4784hunk ./src/allmydata/mutable/servermap.py 569
4785         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
4786         return d
4787 
4788+
4789+    def _got_corrupt_share(self, e, shnum, peerid, data, lp):
4790+        """
4791+        I am called when a remote server returns a corrupt share in
4792+        response to one of our queries. By corrupt, I mean a share
4793+        without a valid signature. I then record the failure, notify the
4794+        server of the corruption, and record the share as bad.
4795+        """
4796+        f = failure.Failure(e)
4797+        self.log(format="bad share: %(f_value)s", f_value=str(f.value),
4798+                 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
4799+        # Notify the server that its share is corrupt.
4800+        self.notify_server_corruption(peerid, shnum, str(e))
4801+        # By flagging this as a bad peer, we won't count any of
4802+        # the other shares on that peer as valid, though if we
4803+        # happen to find a valid version string amongst those
4804+        # shares, we'll keep track of it so that we don't need
4805+        # to validate the signature on those again.
4806+        self._bad_peers.add(peerid)
4807+        self._last_failure = f
4808+        # XXX: Use the reader for this?
4809+        checkstring = data[:SIGNED_PREFIX_LENGTH]
4810+        self._servermap.mark_bad_share(peerid, shnum, checkstring)
4811+        self._servermap.problems.append(f)
4812+
4813+
4814+    def _cache_good_sharedata(self, verinfo, shnum, now, data):
4815+        """
4816+        If one of my queries returns successfully (which means that we
4817+        were able to and successfully did validate the signature), I
4818+        cache the data that we initially fetched from the storage
4819+        server. This will help reduce the number of roundtrips that need
4820+        to occur when the file is downloaded, or when the file is
4821+        updated.
4822+        """
4823+        self._node._add_to_cache(verinfo, shnum, 0, data, now)
4824+
4825+
4826     def _got_results(self, datavs, peerid, readsize, stuff, started):
4827         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
4828                       peerid=idlib.shortnodeid_b2a(peerid),
4829hunk ./src/allmydata/mutable/servermap.py 630
4830         else:
4831             self._empty_peers.add(peerid)
4832 
4833-        last_verinfo = None
4834-        last_shnum = None
4835+        ss, storage_index = stuff
4836+        ds = []
4837+
4838         for shnum,datav in datavs.items():
4839             data = datav[0]
4840hunk ./src/allmydata/mutable/servermap.py 635
4841-            try:
4842-                verinfo = self._got_results_one_share(shnum, data, peerid, lp)
4843-                last_verinfo = verinfo
4844-                last_shnum = shnum
4845-                self._node._add_to_cache(verinfo, shnum, 0, data, now)
4846-            except CorruptShareError, e:
4847-                # log it and give the other shares a chance to be processed
4848-                f = failure.Failure()
4849-                self.log(format="bad share: %(f_value)s", f_value=str(f.value),
4850-                         failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
4851-                self.notify_server_corruption(peerid, shnum, str(e))
4852-                self._bad_peers.add(peerid)
4853-                self._last_failure = f
4854-                checkstring = data[:SIGNED_PREFIX_LENGTH]
4855-                self._servermap.mark_bad_share(peerid, shnum, checkstring)
4856-                self._servermap.problems.append(f)
4857-                pass
4858-
4859-        self._status.timings["cumulative_verify"] += (time.time() - now)
4860+            reader = MDMFSlotReadProxy(ss,
4861+                                       storage_index,
4862+                                       shnum,
4863+                                       data)
4864+            self._readers.setdefault(peerid, dict())[shnum] = reader
4865+            # our goal, with each response, is to validate the version
4866+            # information and share data as best we can at this point --
4867+            # we do this by validating the signature. To do this, we
4868+            # need to do the following:
4869+            #   - If we don't already have the public key, fetch the
4870+            #     public key. We use this to validate the signature.
4871+            if not self._node.get_pubkey():
4872+                # fetch and set the public key.
4873+                d = reader.get_verification_key()
4874+                d.addCallback(lambda results, shnum=shnum, peerid=peerid:
4875+                    self._try_to_set_pubkey(results, peerid, shnum, lp))
4876+                # XXX: Make self._pubkey_query_failed?
4877+                d.addErrback(lambda error, shnum=shnum, peerid=peerid:
4878+                    self._got_corrupt_share(error, shnum, peerid, data, lp))
4879+            else:
4880+                # we already have the public key.
4881+                d = defer.succeed(None)
4882+            # Neither of these two branches return anything of
4883+            # consequence, so the first entry in our deferredlist will
4884+            # be None.
4885 
4886hunk ./src/allmydata/mutable/servermap.py 661
4887-        if self._need_privkey and last_verinfo:
4888-            # send them a request for the privkey. We send one request per
4889-            # server.
4890-            lp2 = self.log("sending privkey request",
4891-                           parent=lp, level=log.NOISY)
4892-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4893-             offsets_tuple) = last_verinfo
4894-            o = dict(offsets_tuple)
4895+            # - Next, we need the version information. We almost
4896+            #   certainly got this by reading the first thousand or so
4897+            #   bytes of the share on the storage server, so we
4898+            #   shouldn't need to fetch anything at this step.
4899+            d2 = reader.get_verinfo()
4900+            d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
4901+                self._got_corrupt_share(error, shnum, peerid, data, lp))
4902+            # - Next, we need the signature. For an SDMF share, it is
4903+            #   likely that we fetched this when doing our initial fetch
4904+            #   to get the version information. In MDMF, this lives at
4905+            #   the end of the share, so unless the file is quite small,
4906+            #   we'll need to do a remote fetch to get it.
4907+            d3 = reader.get_signature()
4908+            d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
4909+                self._got_corrupt_share(error, shnum, peerid, data, lp))
4910+            #  Once we have all three of these responses, we can move on
4911+            #  to validating the signature
4912 
4913hunk ./src/allmydata/mutable/servermap.py 679
4914-            self._queries_outstanding.add(peerid)
4915-            readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
4916-            ss = self._servermap.connections[peerid]
4917-            privkey_started = time.time()
4918-            d = self._do_read(ss, peerid, self._storage_index,
4919-                              [last_shnum], readv)
4920-            d.addCallback(self._got_privkey_results, peerid, last_shnum,
4921-                          privkey_started, lp2)
4922-            d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
4923-            d.addErrback(log.err)
4924-            d.addCallback(self._check_for_done)
4925-            d.addErrback(self._fatal_error)
4926+            # Does the node already have a privkey? If not, we'll try to
4927+            # fetch it here.
4928+            if self._need_privkey:
4929+                d4 = reader.get_encprivkey()
4930+                d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
4931+                    self._try_to_validate_privkey(results, peerid, shnum, lp))
4932+                d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
4933+                    self._privkey_query_failed(error, shnum, data, lp))
4934+            else:
4935+                d4 = defer.succeed(None)
4936 
4937hunk ./src/allmydata/mutable/servermap.py 690
4938+            dl = defer.DeferredList([d, d2, d3, d4])
4939+            dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
4940+                self._got_signature_one_share(results, shnum, peerid, lp))
4941+            dl.addErrback(lambda error, shnum=shnum, data=data:
4942+               self._got_corrupt_share(error, shnum, peerid, data, lp))
4943+            dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
4944+                self._cache_good_sharedata(verinfo, shnum, now, data))
4945+            ds.append(dl)
4946+        # dl is a deferred list that will fire when all of the shares
4947+        # that we found on this peer are done processing. When dl fires,
4948+        # we know that processing is done, so we can decrement the
4949+        # semaphore-like thing that we incremented earlier.
4950+        dl = defer.DeferredList(ds, fireOnOneErrback=True)
4951+        # Are we done? Done means that there are no more queries to
4952+        # send, that there are no outstanding queries, and that we
4953+        # haven't received any queries that are still processing. If we
4954+        # are done, self._check_for_done will cause the done deferred
4955+        # that we returned to our caller to fire, which tells them that
4956+        # they have a complete servermap, and that we won't be touching
4957+        # the servermap anymore.
4958+        dl.addCallback(self._check_for_done)
4959+        dl.addErrback(self._fatal_error)
4960         # all done!
4961         self.log("_got_results done", parent=lp, level=log.NOISY)
4962hunk ./src/allmydata/mutable/servermap.py 714
4963+        return dl
4964+
4965+
4966+    def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
4967+        if self._node.get_pubkey():
4968+            return # don't go through this again if we don't have to
4969+        fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
4970+        assert len(fingerprint) == 32
4971+        if fingerprint != self._node.get_fingerprint():
4972+            raise CorruptShareError(peerid, shnum,
4973+                                "pubkey doesn't match fingerprint")
4974+        self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
4975+        assert self._node.get_pubkey()
4976+
4977 
4978     def notify_server_corruption(self, peerid, shnum, reason):
4979         ss = self._servermap.connections[peerid]
4980hunk ./src/allmydata/mutable/servermap.py 734
4981         ss.callRemoteOnly("advise_corrupt_share",
4982                           "mutable", self._storage_index, shnum, reason)
4983 
4984-    def _got_results_one_share(self, shnum, data, peerid, lp):
4985+
4986+    def _got_signature_one_share(self, results, shnum, peerid, lp):
4987+        # It is our job to give versioninfo to our caller. We need to
4988+        # raise CorruptShareError if the share is corrupt for any
4989+        # reason, something that our caller will handle.
4990         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
4991                  shnum=shnum,
4992                  peerid=idlib.shortnodeid_b2a(peerid),
4993hunk ./src/allmydata/mutable/servermap.py 744
4994                  level=log.NOISY,
4995                  parent=lp)
4996-
4997-        # this might raise NeedMoreDataError, if the pubkey and signature
4998-        # live at some weird offset. That shouldn't happen, so I'm going to
4999-        # treat it as a bad share.
5000-        (seqnum, root_hash, IV, k, N, segsize, datalength,
5001-         pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
5002-
5003-        if not self._node.get_pubkey():
5004-            fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
5005-            assert len(fingerprint) == 32
5006-            if fingerprint != self._node.get_fingerprint():
5007-                raise CorruptShareError(peerid, shnum,
5008-                                        "pubkey doesn't match fingerprint")
5009-            self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
5010-
5011-        if self._need_privkey:
5012-            self._try_to_extract_privkey(data, peerid, shnum, lp)
5013-
5014-        (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
5015-         ig_segsize, ig_datalen, offsets) = unpack_header(data)
5016+        _, verinfo, signature, __ = results
5017+        (seqnum,
5018+         root_hash,
5019+         saltish,
5020+         segsize,
5021+         datalen,
5022+         k,
5023+         n,
5024+         prefix,
5025+         offsets) = verinfo[1]
5026         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
5027 
5028hunk ./src/allmydata/mutable/servermap.py 756
5029-        verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5030+        # XXX: This should be done for us in the method, so
5031+        # presumably you can go in there and fix it.
5032+        verinfo = (seqnum,
5033+                   root_hash,
5034+                   saltish,
5035+                   segsize,
5036+                   datalen,
5037+                   k,
5038+                   n,
5039+                   prefix,
5040                    offsets_tuple)
5041hunk ./src/allmydata/mutable/servermap.py 767
5042+        # This tuple uniquely identifies a share on the grid; we use it
5043+        # to keep track of the ones that we've already seen.
5044 
5045         if verinfo not in self._valid_versions:
5046hunk ./src/allmydata/mutable/servermap.py 771
5047-            # it's a new pair. Verify the signature.
5048-            valid = self._node.get_pubkey().verify(prefix, signature)
5049+            # This is a new version tuple, and we need to validate it
5050+            # against the public key before keeping track of it.
5051+            assert self._node.get_pubkey()
5052+            valid = self._node.get_pubkey().verify(prefix, signature[1])
5053             if not valid:
5054hunk ./src/allmydata/mutable/servermap.py 776
5055-                raise CorruptShareError(peerid, shnum, "signature is invalid")
5056+                raise CorruptShareError(peerid, shnum,
5057+                                        "signature is invalid")
5058 
5059hunk ./src/allmydata/mutable/servermap.py 779
5060-            # ok, it's a valid verinfo. Add it to the list of validated
5061-            # versions.
5062-            self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
5063-                     % (seqnum, base32.b2a(root_hash)[:4],
5064-                        idlib.shortnodeid_b2a(peerid), shnum,
5065-                        k, N, segsize, datalength),
5066-                     parent=lp)
5067-            self._valid_versions.add(verinfo)
5068-        # We now know that this is a valid candidate verinfo.
5069+        # ok, it's a valid verinfo. Add it to the list of validated
5070+        # versions.
5071+        self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
5072+                 % (seqnum, base32.b2a(root_hash)[:4],
5073+                    idlib.shortnodeid_b2a(peerid), shnum,
5074+                    k, n, segsize, datalen),
5075+                    parent=lp)
5076+        self._valid_versions.add(verinfo)
5077+        # We now know that this is a valid candidate verinfo. Whether or
5078+        # not this instance of it is valid is a matter for the next
5079+        # statement; at this point, we just know that if we see this
5080+        # version info again, that its signature checks out and that
5081+        # we're okay to skip the signature-checking step.
5082 
5083hunk ./src/allmydata/mutable/servermap.py 793
5084+        # (peerid, shnum) are bound in the method invocation.
5085         if (peerid, shnum) in self._servermap.bad_shares:
5086             # we've been told that the rest of the data in this share is
5087             # unusable, so don't add it to the servermap.
5088hunk ./src/allmydata/mutable/servermap.py 808
5089         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
5090         return verinfo
5091 
5092+
5093     def _deserialize_pubkey(self, pubkey_s):
5094         verifier = rsa.create_verifying_key_from_string(pubkey_s)
5095         return verifier
5096hunk ./src/allmydata/mutable/servermap.py 813
5097 
5098-    def _try_to_extract_privkey(self, data, peerid, shnum, lp):
5099-        try:
5100-            r = unpack_share(data)
5101-        except NeedMoreDataError, e:
5102-            # this share won't help us. oh well.
5103-            offset = e.encprivkey_offset
5104-            length = e.encprivkey_length
5105-            self.log("shnum %d on peerid %s: share was too short (%dB) "
5106-                     "to get the encprivkey; [%d:%d] ought to hold it" %
5107-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
5108-                      offset, offset+length),
5109-                     parent=lp)
5110-            # NOTE: if uncoordinated writes are taking place, someone might
5111-            # change the share (and most probably move the encprivkey) before
5112-            # we get a chance to do one of these reads and fetch it. This
5113-            # will cause us to see a NotEnoughSharesError(unable to fetch
5114-            # privkey) instead of an UncoordinatedWriteError . This is a
5115-            # nuisance, but it will go away when we move to DSA-based mutable
5116-            # files (since the privkey will be small enough to fit in the
5117-            # write cap).
5118-
5119-            return
5120-
5121-        (seqnum, root_hash, IV, k, N, segsize, datalen,
5122-         pubkey, signature, share_hash_chain, block_hash_tree,
5123-         share_data, enc_privkey) = r
5124-
5125-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
5126 
5127     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
5128hunk ./src/allmydata/mutable/servermap.py 815
5129-
5130+        """
5131+        Given a writekey from a remote server, I validate it against the
5132+        writekey stored in my node. If it is valid, then I set the
5133+        privkey and encprivkey properties of the node.
5134+        """
5135         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
5136         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
5137         if alleged_writekey != self._node.get_writekey():
5138hunk ./src/allmydata/mutable/servermap.py 892
5139         self._queries_completed += 1
5140         self._last_failure = f
5141 
5142-    def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
5143-        now = time.time()
5144-        elapsed = now - started
5145-        self._status.add_per_server_time(peerid, "privkey", started, elapsed)
5146-        self._queries_outstanding.discard(peerid)
5147-        if not self._need_privkey:
5148-            return
5149-        if shnum not in datavs:
5150-            self.log("privkey wasn't there when we asked it",
5151-                     level=log.WEIRD, umid="VA9uDQ")
5152-            return
5153-        datav = datavs[shnum]
5154-        enc_privkey = datav[0]
5155-        self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
5156 
5157     def _privkey_query_failed(self, f, peerid, shnum, lp):
5158         self._queries_outstanding.discard(peerid)
5159hunk ./src/allmydata/mutable/servermap.py 906
5160         self._servermap.problems.append(f)
5161         self._last_failure = f
5162 
5163+
5164     def _check_for_done(self, res):
5165         # exit paths:
5166         #  return self._send_more_queries(outstanding) : send some more queries
5167hunk ./src/allmydata/mutable/servermap.py 912
5168         #  return self._done() : all done
5169         #  return : keep waiting, no new queries
5170-
5171         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
5172                               "%(outstanding)d queries outstanding, "
5173                               "%(extra)d extra peers available, "
5174hunk ./src/allmydata/mutable/servermap.py 1117
5175         self._servermap.last_update_time = self._started
5176         # the servermap will not be touched after this
5177         self.log("servermap: %s" % self._servermap.summarize_versions())
5178+
5179         eventually(self._done_deferred.callback, self._servermap)
5180 
5181     def _fatal_error(self, f):
5182hunk ./src/allmydata/test/test_mutable.py 651
5183         d.addCallback(_created)
5184         return d
5185 
5186-    def publish_multiple(self):
5187+    def publish_mdmf(self):
5188+        # like publish_one, except that the result is guaranteed to be
5189+        # an MDMF file.
5190+        # self.CONTENTS should have more than one segment.
5191+        self.CONTENTS = "This is an MDMF file" * 100000
5192+        self._storage = FakeStorage()
5193+        self._nodemaker = make_nodemaker(self._storage)
5194+        self._storage_broker = self._nodemaker.storage_broker
5195+        d = self._nodemaker.create_mutable_file(self.CONTENTS, version=1)
5196+        def _created(node):
5197+            self._fn = node
5198+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
5199+        d.addCallback(_created)
5200+        return d
5201+
5202+
5203+    def publish_sdmf(self):
5204+        # like publish_one, except that the result is guaranteed to be
5205+        # an SDMF file
5206+        self.CONTENTS = "This is an SDMF file" * 1000
5207+        self._storage = FakeStorage()
5208+        self._nodemaker = make_nodemaker(self._storage)
5209+        self._storage_broker = self._nodemaker.storage_broker
5210+        d = self._nodemaker.create_mutable_file(self.CONTENTS, version=0)
5211+        def _created(node):
5212+            self._fn = node
5213+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
5214+        d.addCallback(_created)
5215+        return d
5216+
5217+
5218+    def publish_multiple(self, version=0):
5219         self.CONTENTS = ["Contents 0",
5220                          "Contents 1",
5221                          "Contents 2",
5222hunk ./src/allmydata/test/test_mutable.py 691
5223         self._copied_shares = {}
5224         self._storage = FakeStorage()
5225         self._nodemaker = make_nodemaker(self._storage)
5226-        d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
5227+        d = self._nodemaker.create_mutable_file(self.CONTENTS[0], version=version) # seqnum=1
5228         def _created(node):
5229             self._fn = node
5230             # now create multiple versions of the same file, and accumulate
5231hunk ./src/allmydata/test/test_mutable.py 919
5232         return d
5233 
5234 
5235+    def test_servermapupdater_finds_mdmf_files(self):
5236+        # setUp already published an MDMF file for us. We just need to
5237+        # make sure that when we run the ServermapUpdater, the file is
5238+        # reported to have one recoverable version.
5239+        d = defer.succeed(None)
5240+        d.addCallback(lambda ignored:
5241+            self.publish_mdmf())
5242+        d.addCallback(lambda ignored:
5243+            self.make_servermap(mode=MODE_CHECK))
5244+        # Calling make_servermap also updates the servermap in the mode
5245+        # that we specify, so we just need to see what it says.
5246+        def _check_servermap(sm):
5247+            self.failUnlessEqual(len(sm.recoverable_versions()), 1)
5248+        d.addCallback(_check_servermap)
5249+        return d
5250+
5251+
5252+    def test_servermapupdater_finds_sdmf_files(self):
5253+        d = defer.succeed(None)
5254+        d.addCallback(lambda ignored:
5255+            self.publish_sdmf())
5256+        d.addCallback(lambda ignored:
5257+            self.make_servermap(mode=MODE_CHECK))
5258+        d.addCallback(lambda servermap:
5259+            self.failUnlessEqual(len(servermap.recoverable_versions()), 1))
5260+        return d
5261+
5262 
5263 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
5264     def setUp(self):
5265hunk ./src/allmydata/test/test_mutable.py 1063
5266         return d
5267     test_no_servers_download.timeout = 15
5268 
5269+
5270     def _test_corrupt_all(self, offset, substring,
5271                           should_succeed=False,
5272                           corrupt_early=True,
5273}
5274[Make a segmented mutable uploader
5275Kevan Carstensen <kevan@isnotajoke.com>**20100624234410
5276 Ignore-this: 3e5182612083ff3e11593a4edf5de307
5277 
5278 The mutable file uploader should be able to publish files with one
5279 segment and files with multiple segments. This patch makes it do that.
5280 This is still incomplete, and rather ugly -- I need to flesh out error
5281 handling, I need to write tests, and I need to remove some of the uglier
5282 kludges in the process before I can call this done.
5283] {
5284hunk ./src/allmydata/mutable/publish.py 8
5285 from zope.interface import implements
5286 from twisted.internet import defer
5287 from twisted.python import failure
5288-from allmydata.interfaces import IPublishStatus
5289+from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION
5290 from allmydata.util import base32, hashutil, mathutil, idlib, log
5291 from allmydata import hashtree, codec
5292 from allmydata.storage.server import si_b2a
5293hunk ./src/allmydata/mutable/publish.py 19
5294      UncoordinatedWriteError, NotEnoughServersError
5295 from allmydata.mutable.servermap import ServerMap
5296 from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
5297-     unpack_checkstring, SIGNED_PREFIX
5298+     unpack_checkstring, SIGNED_PREFIX, MDMFSlotWriteProxy
5299+
5300+KiB = 1024
5301+DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
5302 
5303 class PublishStatus:
5304     implements(IPublishStatus)
5305hunk ./src/allmydata/mutable/publish.py 112
5306         self._status.set_helper(False)
5307         self._status.set_progress(0.0)
5308         self._status.set_active(True)
5309+        # We use this to control how the file is written.
5310+        version = self._node.get_version()
5311+        assert version in (SDMF_VERSION, MDMF_VERSION)
5312+        self._version = version
5313 
5314     def get_status(self):
5315         return self._status
5316hunk ./src/allmydata/mutable/publish.py 134
5317         simultaneous write.
5318         """
5319 
5320-        # 1: generate shares (SDMF: files are small, so we can do it in RAM)
5321-        # 2: perform peer selection, get candidate servers
5322-        #  2a: send queries to n+epsilon servers, to determine current shares
5323-        #  2b: based upon responses, create target map
5324-        # 3: send slot_testv_and_readv_and_writev messages
5325-        # 4: as responses return, update share-dispatch table
5326-        # 4a: may need to run recovery algorithm
5327-        # 5: when enough responses are back, we're done
5328+        # 0. Setup encoding parameters, encoder, and other such things.
5329+        # 1. Encrypt, encode, and publish segments.
5330 
5331         self.log("starting publish, datalen is %s" % len(newdata))
5332         self._status.set_size(len(newdata))
5333hunk ./src/allmydata/mutable/publish.py 187
5334         self.bad_peers = set() # peerids who have errbacked/refused requests
5335 
5336         self.newdata = newdata
5337-        self.salt = os.urandom(16)
5338 
5339hunk ./src/allmydata/mutable/publish.py 188
5340+        # This will set self.segment_size, self.num_segments, and
5341+        # self.fec.
5342         self.setup_encoding_parameters()
5343 
5344         # if we experience any surprises (writes which were rejected because
5345hunk ./src/allmydata/mutable/publish.py 238
5346             self.bad_share_checkstrings[key] = old_checkstring
5347             self.connections[peerid] = self._servermap.connections[peerid]
5348 
5349-        # create the shares. We'll discard these as they are delivered. SDMF:
5350-        # we're allowed to hold everything in memory.
5351+        # Now, the process dovetails -- if this is an SDMF file, we need
5352+        # to write an SDMF file. Otherwise, we need to write an MDMF
5353+        # file.
5354+        if self._version == MDMF_VERSION:
5355+            return self._publish_mdmf()
5356+        else:
5357+            return self._publish_sdmf()
5358+        #return self.done_deferred
5359+
5360+    def _publish_mdmf(self):
5361+        # Next, we find homes for all of the shares that we don't have
5362+        # homes for yet.
5363+        # TODO: Make this part do peer selection.
5364+        self.update_goal()
5365+        self.writers = {}
5366+        # For each (peerid, shnum) in self.goal, we make an
5367+        # MDMFSlotWriteProxy for that peer. We'll use this to write
5368+        # shares to the peer.
5369+        for key in self.goal:
5370+            peerid, shnum = key
5371+            write_enabler = self._node.get_write_enabler(peerid)
5372+            renew_secret = self._node.get_renewal_secret(peerid)
5373+            cancel_secret = self._node.get_cancel_secret(peerid)
5374+            secrets = (write_enabler, renew_secret, cancel_secret)
5375+
5376+            self.writers[shnum] =  MDMFSlotWriteProxy(shnum,
5377+                                                      self.connections[peerid],
5378+                                                      self._storage_index,
5379+                                                      secrets,
5380+                                                      self._new_seqnum,
5381+                                                      self.required_shares,
5382+                                                      self.total_shares,
5383+                                                      self.segment_size,
5384+                                                      len(self.newdata))
5385+            if (peerid, shnum) in self._servermap.servermap:
5386+                old_versionid, old_timestamp = self._servermap.servermap[key]
5387+                (old_seqnum, old_root_hash, old_salt, old_segsize,
5388+                 old_datalength, old_k, old_N, old_prefix,
5389+                 old_offsets_tuple) = old_versionid
5390+                old_checkstring = pack_checkstring(old_seqnum,
5391+                                                   old_root_hash,
5392+                                                   old_salt, 1)
5393+                self.writers[shnum].set_checkstring(old_checkstring)
5394+
5395+        # Now, we start pushing shares.
5396+        self._status.timings["setup"] = time.time() - self._started
5397+        def _start_pushing(res):
5398+            self._started_pushing = time.time()
5399+            return res
5400+
5401+        # First, we encrypt, encode, and publish the shares that we need
5402+        # to encrypt, encode, and publish.
5403+
5404+        # This will eventually hold the block hash chain for each share
5405+        # that we publish. We define it this way so that empty publishes
5406+        # will still have something to write to the remote slot.
5407+        self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
5408+        self.sharehash_leaves = None # eventually [sharehashes]
5409+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
5410+                              # validate the share]
5411 
5412hunk ./src/allmydata/mutable/publish.py 299
5413+        d = defer.succeed(None)
5414+        self.log("Starting push")
5415+        for i in xrange(self.num_segments - 1):
5416+            d.addCallback(lambda ignored, i=i:
5417+                self.push_segment(i))
5418+            d.addCallback(self._turn_barrier)
5419+        # We have at least one segment, so we will have a tail segment
5420+        if self.num_segments > 0:
5421+            d.addCallback(lambda ignored:
5422+                self.push_tail_segment())
5423+
5424+        d.addCallback(lambda ignored:
5425+            self.push_encprivkey())
5426+        d.addCallback(lambda ignored:
5427+            self.push_blockhashes())
5428+        d.addCallback(lambda ignored:
5429+            self.push_salthashes())
5430+        d.addCallback(lambda ignored:
5431+            self.push_sharehashes())
5432+        d.addCallback(lambda ignored:
5433+            self.push_toplevel_hashes_and_signature())
5434+        d.addCallback(lambda ignored:
5435+            self.finish_publishing())
5436+        return d
5437+
5438+
5439+    def _publish_sdmf(self):
5440         self._status.timings["setup"] = time.time() - self._started
5441hunk ./src/allmydata/mutable/publish.py 327
5442+        self.salt = os.urandom(16)
5443+
5444         d = self._encrypt_and_encode()
5445         d.addCallback(self._generate_shares)
5446         def _start_pushing(res):
5447hunk ./src/allmydata/mutable/publish.py 340
5448 
5449         return self.done_deferred
5450 
5451+
5452     def setup_encoding_parameters(self):
5453hunk ./src/allmydata/mutable/publish.py 342
5454-        segment_size = len(self.newdata)
5455+        if self._version == MDMF_VERSION:
5456+            segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
5457+        else:
5458+            segment_size = len(self.newdata) # SDMF is only one segment
5459         # this must be a multiple of self.required_shares
5460         segment_size = mathutil.next_multiple(segment_size,
5461                                               self.required_shares)
5462hunk ./src/allmydata/mutable/publish.py 355
5463                                                   segment_size)
5464         else:
5465             self.num_segments = 0
5466-        assert self.num_segments in [0, 1,] # SDMF restrictions
5467+        if self._version == SDMF_VERSION:
5468+            assert self.num_segments in (0, 1) # SDMF
5469+            return
5470+        # calculate the tail segment size.
5471+        self.tail_segment_size = len(self.newdata) % segment_size
5472+
5473+        if self.tail_segment_size == 0:
5474+            # The tail segment is the same size as the other segments.
5475+            self.tail_segment_size = segment_size
5476+
5477+        # We'll make an encoder ahead-of-time for the normal-sized
5478+        # segments (defined as any segment of segment_size size.
5479+        # (the part of the code that puts the tail segment will make its
5480+        #  own encoder for that part)
5481+        fec = codec.CRSEncoder()
5482+        fec.set_params(self.segment_size,
5483+                       self.required_shares, self.total_shares)
5484+        self.piece_size = fec.get_block_size()
5485+        self.fec = fec
5486+        # This is not technically part of the encoding parameters, but
5487+        # that we are setting up the encoder and encoding parameters is
5488+        # a good indicator that we will soon need it.
5489+        self.salt_hashes = []
5490+
5491+
5492+    def push_segment(self, segnum):
5493+        started = time.time()
5494+        segsize = self.segment_size
5495+        self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
5496+        data = self.newdata[segsize * segnum:segsize*(segnum + 1)]
5497+        assert len(data) == segsize
5498+
5499+        salt = os.urandom(16)
5500+        self.salt_hashes.append(hashutil.mutable_salt_hash(salt))
5501+
5502+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
5503+        enc = AES(key)
5504+        crypttext = enc.process(data)
5505+        assert len(crypttext) == len(data)
5506+
5507+        now = time.time()
5508+        self._status.timings["encrypt"] = now - started
5509+        started = now
5510+
5511+        # now apply FEC
5512+
5513+        self._status.set_status("Encoding")
5514+        crypttext_pieces = [None] * self.required_shares
5515+        piece_size = self.piece_size
5516+        for i in range(len(crypttext_pieces)):
5517+            offset = i * piece_size
5518+            piece = crypttext[offset:offset+piece_size]
5519+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
5520+            crypttext_pieces[i] = piece
5521+            assert len(piece) == piece_size
5522+        d = self.fec.encode(crypttext_pieces)
5523+        def _done_encoding(res):
5524+            elapsed = time.time() - started
5525+            self._status.timings["encode"] = elapsed
5526+            return res
5527+        d.addCallback(_done_encoding)
5528+
5529+        def _push_shares_and_salt(results):
5530+            shares, shareids = results
5531+            dl = []
5532+            for i in xrange(len(shares)):
5533+                sharedata = shares[i]
5534+                shareid = shareids[i]
5535+                block_hash = hashutil.block_hash(sharedata)
5536+                self.blockhashes[shareid].append(block_hash)
5537+
5538+                # find the writer for this share
5539+                d = self.writers[shareid].put_block(sharedata, segnum, salt)
5540+                dl.append(d)
5541+            # TODO: Naturally, we need to check on the results of these.
5542+            return defer.DeferredList(dl)
5543+        d.addCallback(_push_shares_and_salt)
5544+        return d
5545+
5546+
5547+    def push_tail_segment(self):
5548+        # This is essentially the same as push_segment, except that we
5549+        # don't use the cached encoder that we use elsewhere.
5550+        self.log("Pushing tail segment")
5551+        started = time.time()
5552+        segsize = self.segment_size
5553+        data = self.newdata[segsize * (self.num_segments-1):]
5554+        assert len(data) == self.tail_segment_size
5555+        salt = os.urandom(16)
5556+        self.salt_hashes.append(hashutil.mutable_salt_hash(salt))
5557+
5558+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
5559+        enc = AES(key)
5560+        crypttext = enc.process(data)
5561+        assert len(crypttext) == len(data)
5562+
5563+        now = time.time()
5564+        self._status.timings['encrypt'] = now - started
5565+        started = now
5566+
5567+        self._status.set_status("Encoding")
5568+        tail_fec = codec.CRSEncoder()
5569+        tail_fec.set_params(self.tail_segment_size,
5570+                            self.required_shares,
5571+                            self.total_shares)
5572+
5573+        crypttext_pieces = [None] * self.required_shares
5574+        piece_size = tail_fec.get_block_size()
5575+        for i in range(len(crypttext_pieces)):
5576+            offset = i * piece_size
5577+            piece = crypttext[offset:offset+piece_size]
5578+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
5579+            crypttext_pieces[i] = piece
5580+            assert len(piece) == piece_size
5581+        d = tail_fec.encode(crypttext_pieces)
5582+        def _push_shares_and_salt(results):
5583+            shares, shareids = results
5584+            dl = []
5585+            for i in xrange(len(shares)):
5586+                sharedata = shares[i]
5587+                shareid = shareids[i]
5588+                block_hash = hashutil.block_hash(sharedata)
5589+                self.blockhashes[shareid].append(block_hash)
5590+                # find the writer for this share
5591+                d = self.writers[shareid].put_block(sharedata,
5592+                                                    self.num_segments - 1,
5593+                                                    salt)
5594+                dl.append(d)
5595+            # TODO: Naturally, we need to check on the results of these.
5596+            return defer.DeferredList(dl)
5597+        d.addCallback(_push_shares_and_salt)
5598+        return d
5599+
5600+
5601+    def push_encprivkey(self):
5602+        started = time.time()
5603+        encprivkey = self._encprivkey
5604+        dl = []
5605+        def _spy_on_writer(results):
5606+            print results
5607+            return results
5608+        for shnum, writer in self.writers.iteritems():
5609+            d = writer.put_encprivkey(encprivkey)
5610+            dl.append(d)
5611+        d = defer.DeferredList(dl)
5612+        return d
5613+
5614+
5615+    def push_blockhashes(self):
5616+        started = time.time()
5617+        dl = []
5618+        def _spy_on_results(results):
5619+            print results
5620+            return results
5621+        self.sharehash_leaves = [None] * len(self.blockhashes)
5622+        for shnum, blockhashes in self.blockhashes.iteritems():
5623+            t = hashtree.HashTree(blockhashes)
5624+            self.blockhashes[shnum] = list(t)
5625+            # set the leaf for future use.
5626+            self.sharehash_leaves[shnum] = t[0]
5627+            d = self.writers[shnum].put_blockhashes(self.blockhashes[shnum])
5628+            dl.append(d)
5629+        d = defer.DeferredList(dl)
5630+        return d
5631+
5632+
5633+    def push_salthashes(self):
5634+        started = time.time()
5635+        dl = []
5636+        t = hashtree.HashTree(self.salt_hashes)
5637+        pushing = list(t)
5638+        for shnum in self.writers.iterkeys():
5639+            d = self.writers[shnum].put_salthashes(t)
5640+            dl.append(d)
5641+        dl = defer.DeferredList(dl)
5642+        return dl
5643+
5644+
5645+    def push_sharehashes(self):
5646+        share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
5647+        share_hash_chain = {}
5648+        ds = []
5649+        def _spy_on_results(results):
5650+            print results
5651+            return results
5652+        for shnum in xrange(len(self.sharehash_leaves)):
5653+            needed_indices = share_hash_tree.needed_hashes(shnum)
5654+            self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
5655+                                             for i in needed_indices] )
5656+            d = self.writers[shnum].put_sharehashes(self.sharehashes[shnum])
5657+            ds.append(d)
5658+        self.root_hash = share_hash_tree[0]
5659+        d = defer.DeferredList(ds)
5660+        return d
5661+
5662+
5663+    def push_toplevel_hashes_and_signature(self):
5664+        # We need to to three things here:
5665+        #   - Push the root hash and salt hash
5666+        #   - Get the checkstring of the resulting layout; sign that.
5667+        #   - Push the signature
5668+        ds = []
5669+        def _spy_on_results(results):
5670+            print results
5671+            return results
5672+        for shnum in xrange(self.total_shares):
5673+            d = self.writers[shnum].put_root_hash(self.root_hash)
5674+            ds.append(d)
5675+        d = defer.DeferredList(ds)
5676+        def _make_and_place_signature(ignored):
5677+            signable = self.writers[0].get_signable()
5678+            self.signature = self._privkey.sign(signable)
5679+
5680+            ds = []
5681+            for (shnum, writer) in self.writers.iteritems():
5682+                d = writer.put_signature(self.signature)
5683+                ds.append(d)
5684+            return defer.DeferredList(ds)
5685+        d.addCallback(_make_and_place_signature)
5686+        return d
5687+
5688+
5689+    def finish_publishing(self):
5690+        # We're almost done -- we just need to put the verification key
5691+        # and the offsets
5692+        ds = []
5693+        verification_key = self._pubkey.serialize()
5694+
5695+        def _spy_on_results(results):
5696+            print results
5697+            return results
5698+        for (shnum, writer) in self.writers.iteritems():
5699+            d = writer.put_verification_key(verification_key)
5700+            d.addCallback(lambda ignored, writer=writer:
5701+                writer.finish_publishing())
5702+            ds.append(d)
5703+        return defer.DeferredList(ds)
5704+
5705+
5706+    def _turn_barrier(self, res):
5707+        # putting this method in a Deferred chain imposes a guaranteed
5708+        # reactor turn between the pre- and post- portions of that chain.
5709+        # This can be useful to limit memory consumption: since Deferreds do
5710+        # not do tail recursion, code which uses defer.succeed(result) for
5711+        # consistency will cause objects to live for longer than you might
5712+        # normally expect.
5713+        return fireEventually(res)
5714+
5715 
5716     def _fatal_error(self, f):
5717         self.log("error during loop", failure=f, level=log.UNUSUAL)
5718hunk ./src/allmydata/mutable/publish.py 739
5719             self.log_goal(self.goal, "after update: ")
5720 
5721 
5722-
5723     def _encrypt_and_encode(self):
5724         # this returns a Deferred that fires with a list of (sharedata,
5725         # sharenum) tuples. TODO: cache the ciphertext, only produce the
5726hunk ./src/allmydata/mutable/publish.py 780
5727         d.addCallback(_done_encoding)
5728         return d
5729 
5730+
5731     def _generate_shares(self, shares_and_shareids):
5732         # this sets self.shares and self.root_hash
5733         self.log("_generate_shares")
5734hunk ./src/allmydata/mutable/publish.py 1168
5735             self._status.set_progress(1.0)
5736         eventually(self.done_deferred.callback, res)
5737 
5738-
5739hunk ./src/allmydata/test/test_mutable.py 226
5740         d.addCallback(_created)
5741         return d
5742 
5743+
5744+    def test_create_mdmf(self):
5745+        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
5746+        def _created(n):
5747+            self.failUnless(isinstance(n, MutableFileNode))
5748+            self.failUnlessEqual(n.get_storage_index(), n._storage_index)
5749+            sb = self.nodemaker.storage_broker
5750+            peer0 = sorted(sb.get_all_serverids())[0]
5751+            shnums = self._storage._peers[peer0].keys()
5752+            self.failUnlessEqual(len(shnums), 1)
5753+        d.addCallback(_created)
5754+        return d
5755+
5756+
5757     def test_serialize(self):
5758         n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
5759         calls = []
5760hunk ./src/allmydata/test/test_mutable.py 349
5761         d.addCallback(_created)
5762         return d
5763 
5764+
5765+    def test_create_mdmf_with_initial_contents(self):
5766+        initial_contents = "foobarbaz" * 131072 # 900KiB
5767+        d = self.nodemaker.create_mutable_file(initial_contents,
5768+                                               version=MDMF_VERSION)
5769+        def _created(n):
5770+            d = n.download_best_version()
5771+            d.addCallback(lambda data:
5772+                self.failUnlessEqual(data, initial_contents))
5773+            d.addCallback(lambda ignored:
5774+                n.overwrite(initial_contents + "foobarbaz"))
5775+            d.addCallback(lambda ignored:
5776+                n.download_best_version())
5777+            d.addCallback(lambda data:
5778+                self.failUnlessEqual(data, initial_contents +
5779+                                           "foobarbaz"))
5780+            return d
5781+        d.addCallback(_created)
5782+        return d
5783+
5784+
5785     def test_create_with_initial_contents_function(self):
5786         data = "initial contents"
5787         def _make_contents(n):
5788hunk ./src/allmydata/test/test_mutable.py 385
5789         d.addCallback(lambda data2: self.failUnlessEqual(data2, data))
5790         return d
5791 
5792+
5793+    def test_create_mdmf_with_initial_contents_function(self):
5794+        data = "initial contents" * 100000
5795+        def _make_contents(n):
5796+            self.failUnless(isinstance(n, MutableFileNode))
5797+            key = n.get_writekey()
5798+            self.failUnless(isinstance(key, str), key)
5799+            self.failUnlessEqual(len(key), 16)
5800+            return data
5801+        d = self.nodemaker.create_mutable_file(_make_contents,
5802+                                               version=MDMF_VERSION)
5803+        d.addCallback(lambda n:
5804+            n.download_best_version())
5805+        d.addCallback(lambda data2:
5806+            self.failUnlessEqual(data2, data))
5807+        return d
5808+
5809+
5810     def test_create_with_too_large_contents(self):
5811         BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
5812         d = self.nodemaker.create_mutable_file(BIG)
5813}
5814[Misc. changes to support the work I'm doing
5815Kevan Carstensen <kevan@isnotajoke.com>**20100624234637
5816 Ignore-this: fdd18fa8cc05f4b4b15ff53ee24a1819
5817 
5818     - Add a notion of file version number to interfaces.py
5819     - Alter mutable file node interfaces to have a notion of version,
5820       though this may be changed later.
5821     - Alter mutable/filenode.py to conform to these changes.
5822     - Add a salt hasher to util/hashutil.py
5823] {
5824hunk ./src/allmydata/interfaces.py 7
5825      ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
5826 
5827 HASH_SIZE=32
5828+SALT_SIZE=16
5829+
5830+SDMF_VERSION=0
5831+MDMF_VERSION=1
5832 
5833 Hash = StringConstraint(maxLength=HASH_SIZE,
5834                         minLength=HASH_SIZE)# binary format 32-byte SHA256 hash
5835hunk ./src/allmydata/interfaces.py 811
5836         writer-visible data using this writekey.
5837         """
5838 
5839+    def set_version(version):
5840+        """Tahoe-LAFS supports SDMF and MDMF mutable files. By default,
5841+        we upload in SDMF for reasons of compatibility. If you want to
5842+        change this, set_version will let you do that.
5843+
5844+        To say that this file should be uploaded in SDMF, pass in a 0. To
5845+        say that the file should be uploaded as MDMF, pass in a 1.
5846+        """
5847+
5848+    def get_version():
5849+        """Returns the mutable file protocol version."""
5850+
5851 class NotEnoughSharesError(Exception):
5852     """Download was unable to get enough shares"""
5853 
5854hunk ./src/allmydata/mutable/filenode.py 8
5855 from twisted.internet import defer, reactor
5856 from foolscap.api import eventually
5857 from allmydata.interfaces import IMutableFileNode, \
5858-     ICheckable, ICheckResults, NotEnoughSharesError
5859+     ICheckable, ICheckResults, NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION
5860 from allmydata.util import hashutil, log
5861 from allmydata.util.assertutil import precondition
5862 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
5863hunk ./src/allmydata/mutable/filenode.py 67
5864         self._sharemap = {} # known shares, shnum-to-[nodeids]
5865         self._cache = ResponseCache()
5866         self._most_recent_size = None
5867+        # filled in after __init__ if we're being created for the first time;
5868+        # filled in by the servermap updater before publishing, otherwise.
5869+        # set to this default value in case neither of those things happen,
5870+        # or in case the servermap can't find any shares to tell us what
5871+        # to publish as.
5872+        # TODO: Set this back to None, and find out why the tests fail
5873+        #       with it set to None.
5874+        self._protocol_version = SDMF_VERSION
5875 
5876         # all users of this MutableFileNode go through the serializer. This
5877         # takes advantage of the fact that Deferreds discard the callbacks
5878hunk ./src/allmydata/mutable/filenode.py 472
5879     def _did_upload(self, res, size):
5880         self._most_recent_size = size
5881         return res
5882+
5883+
5884+    def set_version(self, version):
5885+        # I can be set in two ways:
5886+        #  1. When the node is created.
5887+        #  2. (for an existing share) when the Servermap is updated
5888+        #     before I am read.
5889+        assert version in (MDMF_VERSION, SDMF_VERSION)
5890+        self._protocol_version = version
5891+
5892+
5893+    def get_version(self):
5894+        return self._protocol_version
5895hunk ./src/allmydata/util/hashutil.py 90
5896 MUTABLE_READKEY_TAG = "allmydata_mutable_writekey_to_readkey_v1"
5897 MUTABLE_DATAKEY_TAG = "allmydata_mutable_readkey_to_datakey_v1"
5898 MUTABLE_STORAGEINDEX_TAG = "allmydata_mutable_readkey_to_storage_index_v1"
5899+MUTABLE_SALT_TAG = "allmydata_mutable_segment_salt_v1"
5900 
5901 # dirnodes
5902 DIRNODE_CHILD_WRITECAP_TAG = "allmydata_mutable_writekey_and_salt_to_dirnode_child_capkey_v1"
5903hunk ./src/allmydata/util/hashutil.py 134
5904 def plaintext_segment_hasher():
5905     return tagged_hasher(PLAINTEXT_SEGMENT_TAG)
5906 
5907+def mutable_salt_hash(data):
5908+    return tagged_hash(MUTABLE_SALT_TAG, data)
5909+def mutable_salt_hasher():
5910+    return tagged_hasher(MUTABLE_SALT_TAG)
5911+
5912 KEYLEN = 16
5913 IVLEN = 16
5914 
5915}
5916[nodemaker.py: create MDMF files when asked to
5917Kevan Carstensen <kevan@isnotajoke.com>**20100624234833
5918 Ignore-this: 26c16aaca9ddab7a7ce37a4530bc970
5919] {
5920hunk ./src/allmydata/nodemaker.py 3
5921 import weakref
5922 from zope.interface import implements
5923-from allmydata.interfaces import INodeMaker
5924+from allmydata.util.assertutil import precondition
5925+from allmydata.interfaces import INodeMaker, MustBeDeepImmutableError, \
5926+                                 SDMF_VERSION, MDMF_VERSION
5927 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
5928 from allmydata.immutable.upload import Data
5929 from allmydata.mutable.filenode import MutableFileNode
5930hunk ./src/allmydata/nodemaker.py 92
5931             return self._create_dirnode(filenode)
5932         return None
5933 
5934-    def create_mutable_file(self, contents=None, keysize=None):
5935+    def create_mutable_file(self, contents=None, keysize=None,
5936+                            version=SDMF_VERSION):
5937         n = MutableFileNode(self.storage_broker, self.secret_holder,
5938                             self.default_encoding_parameters, self.history)
5939hunk ./src/allmydata/nodemaker.py 96
5940+        n.set_version(version)
5941         d = self.key_generator.generate(keysize)
5942         d.addCallback(n.create_with_keys, contents)
5943         d.addCallback(lambda res: n)
5944hunk ./src/allmydata/nodemaker.py 102
5945         return d
5946 
5947-    def create_new_mutable_directory(self, initial_children={}):
5948+    def create_new_mutable_directory(self, initial_children={},
5949+                                     version=SDMF_VERSION):
5950+        # initial_children must have metadata (i.e. {} instead of None)
5951+        for (name, (node, metadata)) in initial_children.iteritems():
5952+            precondition(isinstance(metadata, dict),
5953+                         "create_new_mutable_directory requires metadata to be a dict, not None", metadata)
5954+            node.raise_error()
5955         d = self.create_mutable_file(lambda n:
5956hunk ./src/allmydata/nodemaker.py 110
5957-                                     pack_children(n, initial_children))
5958+                                     pack_children(n, initial_children),
5959+                                     version)
5960         d.addCallback(self._create_dirnode)
5961         return d
5962 
5963}
5964[storage/server.py: minor code cleanup
5965Kevan Carstensen <kevan@isnotajoke.com>**20100624234905
5966 Ignore-this: 2358c531c39e48d3c8e56b62b5768228
5967] {
5968hunk ./src/allmydata/storage/server.py 569
5969                                          self)
5970         return share
5971 
5972-    def remote_slot_readv(self, storage_index, shares, readv):
5973+    def remote_slot_readv(self, storage_index, shares, readvs):
5974         start = time.time()
5975         self.count("readv")
5976         si_s = si_b2a(storage_index)
5977hunk ./src/allmydata/storage/server.py 590
5978             if sharenum in shares or not shares:
5979                 filename = os.path.join(bucketdir, sharenum_s)
5980                 msf = MutableShareFile(filename, self)
5981-                datavs[sharenum] = msf.readv(readv)
5982+                datavs[sharenum] = msf.readv(readvs)
5983         log.msg("returning shares %s" % (datavs.keys(),),
5984                 facility="tahoe.storage", level=log.NOISY, parent=lp)
5985         self.add_latency("readv", time.time() - start)
5986}
5987[test/test_mutable.py: alter some tests that were failing due to MDMF; minor code cleanup.
5988Kevan Carstensen <kevan@isnotajoke.com>**20100624234924
5989 Ignore-this: afb86ec1fbdbfe1a5ef6f46f350273c0
5990] {
5991hunk ./src/allmydata/test/test_mutable.py 152
5992             chr(ord(original[byte_offset]) ^ 0x01) +
5993             original[byte_offset+1:])
5994 
5995+def add_two(original, byte_offset):
5996+    # It isn't enough to simply flip the bit for the version number,
5997+    # because 1 is a valid version number. So we add two instead.
5998+    return (original[:byte_offset] +
5999+            chr(ord(original[byte_offset]) ^ 0x02) +
6000+            original[byte_offset+1:])
6001+
6002 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
6003     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
6004     # list of shnums to corrupt.
6005hunk ./src/allmydata/test/test_mutable.py 188
6006                 real_offset = offset1
6007             real_offset = int(real_offset) + offset2 + offset_offset
6008             assert isinstance(real_offset, int), offset
6009-            shares[shnum] = flip_bit(data, real_offset)
6010+            if offset1 == 0: # verbyte
6011+                f = add_two
6012+            else:
6013+                f = flip_bit
6014+            shares[shnum] = f(data, real_offset)
6015     return res
6016 
6017 def make_storagebroker(s=None, num_peers=10):
6018hunk ./src/allmydata/test/test_mutable.py 514
6019         d.addCallback(_created)
6020         return d
6021 
6022+
6023     def test_modify_backoffer(self):
6024         def _modifier(old_contents, servermap, first_time):
6025             return old_contents + "line2"
6026hunk ./src/allmydata/test/test_mutable.py 780
6027         d.addCallback(_created)
6028         return d
6029 
6030+
6031     def _copy_shares(self, ignored, index):
6032         shares = self._storage._peers
6033         # we need a deep copy
6034}
6035
6036Context:
6037
6038[docs: about.html link to home page early on, and be decentralized storage instead of cloud storage this time around
6039zooko@zooko.com**20100619065318
6040 Ignore-this: dc6db03f696e5b6d2848699e754d8053
6041] 
6042[docs: update about.html, especially to have a non-broken link to quickstart.html, and also to comment out the broken links to "for Paranoids" and "for Corporates"
6043zooko@zooko.com**20100619065124
6044 Ignore-this: e292c7f51c337a84ebfeb366fbd24d6c
6045] 
6046[TAG allmydata-tahoe-1.7.0
6047zooko@zooko.com**20100619052631
6048 Ignore-this: d21e27afe6d85e2e3ba6a3292ba2be1
6049] 
6050Patch bundle hash:
6051f3f68c46851bd3e28b15474d64378882236d055e