source: trunk/src/allmydata/immutable/offloaded.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 27.4 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os, stat, time, weakref
6from zope.interface import implementer
7from twisted.internet import defer
8from foolscap.api import Referenceable, DeadReferenceError, eventually
9import allmydata # for __full_version__
10from allmydata import interfaces, uri
11from allmydata.storage.server import si_b2a
12from allmydata.immutable import upload
13from allmydata.immutable.layout import ReadBucketProxy
14from allmydata.util.assertutil import precondition
15from allmydata.util import log, observer, fileutil, hashutil, dictutil
16
17
18class NotEnoughWritersError(Exception):
19    pass
20
21
22class CHKCheckerAndUEBFetcher:
23    """I check to see if a file is already present in the grid. I also fetch
24    the URI Extension Block, which is useful for an uploading client who
25    wants to avoid the work of encryption and encoding.
26
27    I return False if the file is not completely healthy: i.e. if there are
28    less than 'N' shares present.
29
30    If the file is completely healthy, I return a tuple of (sharemap,
31    UEB_data, UEB_hash).  A sharemap is a dict with share numbers as keys and
32    sets of server ids (which hold that share) as values.
33    """
34
35    def __init__(self, peer_getter, storage_index, logparent):
36        self._peer_getter = peer_getter
37        self._found_shares = set()
38        self._storage_index = storage_index
39        self._sharemap = dictutil.DictOfSets()
40        self._readers = set()
41        self._ueb_hash = None
42        self._ueb_data = None
43        self._logparent = logparent
44
45    def log(self, *args, **kwargs):
46        if 'facility' not in kwargs:
47            kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
48        if 'parent' not in kwargs:
49            kwargs['parent'] = self._logparent
50        return log.msg(*args, **kwargs)
51
52    def check(self):
53        """
54        :return Deferred[bool|(DictOfSets, dict, bytes)]: If no share can be found
55            with a usable UEB block or fewer than N shares can be found then the
56            Deferred fires with ``False``.  Otherwise, it fires with a tuple of
57            the sharemap, the UEB data, and the UEB hash.
58        """
59        d = self._get_all_shareholders(self._storage_index)
60        d.addCallback(self._get_uri_extension)
61        d.addCallback(self._done)
62        return d
63
64    def _get_all_shareholders(self, storage_index):
65        dl = []
66        for s in self._peer_getter(storage_index):
67            d = s.get_storage_server().get_buckets(storage_index)
68            d.addCallbacks(self._got_response, self._got_error,
69                           callbackArgs=(s,))
70            dl.append(d)
71        return defer.DeferredList(dl)
72
73    def _got_response(self, buckets, server):
74        # buckets is a dict: maps shum to an rref of the server who holds it
75        shnums_s = ",".join([str(shnum) for shnum in buckets])
76        self.log("got_response: [%r] has %d shares (%s)" %
77                 (server.get_name(), len(buckets), shnums_s),
78                 level=log.NOISY)
79        self._found_shares.update(buckets.keys())
80        for k in buckets:
81            self._sharemap.add(k, server.get_serverid())
82        self._readers.update( [ (bucket, server)
83                                for bucket in buckets.values() ] )
84
85    def _got_error(self, f):
86        if f.check(DeadReferenceError):
87            return
88        log.err(f, parent=self._logparent)
89
90    def _get_uri_extension(self, res):
91        # assume that we can pull the UEB from any share. If we get an error,
92        # declare the whole file unavailable.
93        if not self._readers:
94            self.log("no readers, so no UEB", level=log.NOISY)
95            return
96        b,server = self._readers.pop()
97        rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index))
98        d = rbp.get_uri_extension()
99        d.addCallback(self._got_uri_extension)
100        d.addErrback(self._ueb_error)
101        return d
102
103    def _got_uri_extension(self, ueb):
104        self.log("_got_uri_extension", level=log.NOISY)
105        self._ueb_hash = hashutil.uri_extension_hash(ueb)
106        self._ueb_data = uri.unpack_extension(ueb)
107
108    def _ueb_error(self, f):
109        # an error means the file is unavailable, but the overall check
110        # shouldn't fail.
111        self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
112        return None
113
114    def _done(self, res):
115        if self._ueb_data:
116            found = len(self._found_shares)
117            total = self._ueb_data['total_shares']
118            self.log(format="got %(found)d shares of %(total)d",
119                     found=found, total=total, level=log.NOISY)
120            if found < total:
121                # not all shares are present in the grid
122                self.log("not enough to qualify, file not found in grid",
123                         level=log.NOISY)
124                return False
125            # all shares are present
126            self.log("all shares present, file is found in grid",
127                     level=log.NOISY)
128            return (self._sharemap, self._ueb_data, self._ueb_hash)
129        # no shares are present
130        self.log("unable to find UEB data, file not found in grid",
131                 level=log.NOISY)
132        return False
133
134
135@implementer(interfaces.RICHKUploadHelper)
136class CHKUploadHelper(Referenceable, upload.CHKUploader):  # type: ignore # warner/foolscap#78
137    """I am the helper-server -side counterpart to AssistedUploader. I handle
138    peer selection, encoding, and share pushing. I read ciphertext from the
139    remote AssistedUploader.
140    """
141    VERSION = { b"http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
142                 { },
143                b"application-version": allmydata.__full_version__.encode("utf-8"),
144                }
145
146    def __init__(self, storage_index,
147                 helper, storage_broker, secret_holder,
148                 incoming_file, encoding_file,
149                 log_number):
150        upload.CHKUploader.__init__(self, storage_broker, secret_holder)
151        self._storage_index = storage_index
152        self._helper = helper
153        self._incoming_file = incoming_file
154        self._encoding_file = encoding_file
155        self._upload_id = si_b2a(storage_index)[:5]
156        self._log_number = log_number
157        self._upload_status = upload.UploadStatus()
158        self._upload_status.set_helper(False)
159        self._upload_status.set_storage_index(storage_index)
160        self._upload_status.set_status("fetching ciphertext")
161        self._upload_status.set_progress(0, 1.0)
162        self._helper.log("CHKUploadHelper starting for SI %r" % self._upload_id,
163                         parent=log_number)
164
165        self._storage_broker = storage_broker
166        self._secret_holder = secret_holder
167        self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
168                                             self._log_number)
169        self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
170        self._finished_observers = observer.OneShotObserverList()
171
172        self._started = time.time()
173        d = self._fetcher.when_done()
174        d.addCallback(lambda res: self._reader.start())
175        d.addCallback(lambda res: self.start_encrypted(self._reader))
176        d.addCallback(self._finished)
177        d.addErrback(self._failed)
178
179    def log(self, *args, **kwargs):
180        if 'facility' not in kwargs:
181            kwargs['facility'] = "tahoe.helper.chk"
182        return upload.CHKUploader.log(self, *args, **kwargs)
183
184    def remote_get_version(self):
185        return self.VERSION
186
187    def remote_upload(self, reader):
188        # reader is an RIEncryptedUploadable. I am specified to return an
189        # UploadResults dictionary.
190
191        # Log how much ciphertext we need to get.
192        self.log("deciding whether to upload the file or not", level=log.NOISY)
193        if os.path.exists(self._encoding_file):
194            # we have the whole file, and we might be encoding it (or the
195            # encode/upload might have failed, and we need to restart it).
196            self.log("ciphertext already in place", level=log.UNUSUAL)
197        elif os.path.exists(self._incoming_file):
198            # we have some of the file, but not all of it (otherwise we'd be
199            # encoding). The caller might be useful.
200            self.log("partial ciphertext already present", level=log.UNUSUAL)
201        else:
202            # we don't remember uploading this file
203            self.log("no ciphertext yet", level=log.NOISY)
204
205        # let our fetcher pull ciphertext from the reader.
206        self._fetcher.add_reader(reader)
207        # and also hashes
208        self._reader.add_reader(reader)
209
210        # and inform the client when the upload has finished
211        return self._finished_observers.when_fired()
212
213    def _finished(self, ur):
214        assert interfaces.IUploadResults.providedBy(ur), ur
215        vcapstr = ur.get_verifycapstr()
216        precondition(isinstance(vcapstr, bytes), vcapstr)
217        v = uri.from_string(vcapstr)
218        f_times = self._fetcher.get_times()
219
220        hur = upload.HelperUploadResults()
221        hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
222                       "total_fetch": f_times["total"],
223                       }
224        for key,val in ur.get_timings().items():
225            hur.timings[key] = val
226        hur.uri_extension_hash = v.uri_extension_hash
227        hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
228        hur.preexisting_shares = ur.get_preexisting_shares()
229        # hur.sharemap needs to be {shnum: set(serverid)}
230        hur.sharemap = {}
231        for shnum, servers in ur.get_sharemap().items():
232            hur.sharemap[shnum] = set([s.get_serverid() for s in servers])
233        # and hur.servermap needs to be {serverid: set(shnum)}
234        hur.servermap = {}
235        for server, shnums in ur.get_servermap().items():
236            hur.servermap[server.get_serverid()] = set(shnums)
237        hur.pushed_shares = ur.get_pushed_shares()
238        hur.file_size = ur.get_file_size()
239        hur.uri_extension_data = ur.get_uri_extension_data()
240        hur.verifycapstr = vcapstr
241
242        self._reader.close()
243        os.unlink(self._encoding_file)
244        self._finished_observers.fire(hur)
245        self._helper.upload_finished(self._storage_index, v.size)
246        del self._reader
247
248    def _failed(self, f):
249        self.log(format="CHKUploadHelper(%(si)s) failed",
250                 si=si_b2a(self._storage_index)[:5],
251                 failure=f,
252                 level=log.UNUSUAL)
253        self._finished_observers.fire(f)
254        self._helper.upload_finished(self._storage_index, 0)
255        del self._reader
256
257class AskUntilSuccessMixin:
258    # create me with a _reader array
259    _last_failure = None
260
261    def add_reader(self, reader):
262        self._readers.append(reader)
263
264    def call(self, *args, **kwargs):
265        if not self._readers:
266            raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
267        rr = self._readers[0]
268        d = rr.callRemote(*args, **kwargs)
269        def _err(f):
270            self._last_failure = f
271            if rr in self._readers:
272                self._readers.remove(rr)
273            self._upload_helper.log("call to assisted uploader %s failed" % rr,
274                                    failure=f, level=log.UNUSUAL)
275            # we can try again with someone else who's left
276            return self.call(*args, **kwargs)
277        d.addErrback(_err)
278        return d
279
280class CHKCiphertextFetcher(AskUntilSuccessMixin):
281    """I use one or more remote RIEncryptedUploadable instances to gather
282    ciphertext on disk. When I'm done, the file I create can be used by a
283    LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
284    process.
285
286    I begin pulling ciphertext as soon as a reader is added. I remove readers
287    when they have any sort of error. If the last reader is removed, I fire
288    my when_done() Deferred with a failure.
289
290    I fire my when_done() Deferred (with None) immediately after I have moved
291    the ciphertext to 'encoded_file'.
292    """
293
294    def __init__(self, helper, incoming_file, encoded_file, logparent):
295        self._upload_helper = helper
296        self._incoming_file = incoming_file
297        self._encoding_file = encoded_file
298        self._upload_id = helper._upload_id
299        self._log_parent = logparent
300        self._done_observers = observer.OneShotObserverList()
301        self._readers = []
302        self._started = False
303        self._f = None
304        self._times = {
305            "cumulative_fetch": 0.0,
306            "total": 0.0,
307            }
308        self._ciphertext_fetched = 0
309
310    def log(self, *args, **kwargs):
311        if "facility" not in kwargs:
312            kwargs["facility"] = "tahoe.helper.chkupload.fetch"
313        if "parent" not in kwargs:
314            kwargs["parent"] = self._log_parent
315        return log.msg(*args, **kwargs)
316
317    def add_reader(self, reader):
318        AskUntilSuccessMixin.add_reader(self, reader)
319        eventually(self._start)
320
321    def _start(self):
322        if self._started:
323            return
324        self._started = True
325        started = time.time()
326
327        if os.path.exists(self._encoding_file):
328            self.log("ciphertext already present, bypassing fetch",
329                     level=log.UNUSUAL)
330            d = defer.succeed(None)
331        else:
332            # first, find out how large the file is going to be
333            d = self.call("get_size")
334            d.addCallback(self._got_size)
335            d.addCallback(self._start_reading)
336            d.addCallback(self._done)
337        d.addCallback(self._done2, started)
338        d.addErrback(self._failed)
339
340    def _got_size(self, size):
341        self.log("total size is %d bytes" % size, level=log.NOISY)
342        self._upload_helper._upload_status.set_size(size)
343        self._expected_size = size
344
345    def _start_reading(self, res):
346        # then find out how much crypttext we have on disk
347        if os.path.exists(self._incoming_file):
348            self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
349            self._upload_helper._helper.count("chk_upload_helper.resumes")
350            self.log("we already have %d bytes" % self._have, level=log.NOISY)
351        else:
352            self._have = 0
353            self.log("we do not have any ciphertext yet", level=log.NOISY)
354        self.log("starting ciphertext fetch", level=log.NOISY)
355        self._f = open(self._incoming_file, "ab")
356
357        # now loop to pull the data from the readers
358        d = defer.Deferred()
359        self._loop(d)
360        # this Deferred will be fired once the last byte has been written to
361        # self._f
362        return d
363
364    # read data in 50kB chunks. We should choose a more considered number
365    # here, possibly letting the client specify it. The goal should be to
366    # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
367    # the upload bandwidth lost because this protocol is non-windowing. Too
368    # large, however, means more memory consumption for both ends. Something
369    # that can be transferred in, say, 10 seconds sounds about right. On my
370    # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
371    # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
372    # memory than I want to hang on to, so I'm going to go with 50kB and see
373    # how that works.
374    CHUNK_SIZE = 50*1024
375
376    def _loop(self, fire_when_done):
377        # this slightly weird structure is needed because Deferreds don't do
378        # tail-recursion, so it is important to let each one retire promptly.
379        # Simply chaining them will cause a stack overflow at the end of a
380        # transfer that involves more than a few hundred chunks.
381        # 'fire_when_done' lives a long time, but the Deferreds returned by
382        # the inner _fetch() call do not.
383        start = time.time()
384        d = defer.maybeDeferred(self._fetch)
385        def _done(finished):
386            elapsed = time.time() - start
387            self._times["cumulative_fetch"] += elapsed
388            if finished:
389                self.log("finished reading ciphertext", level=log.NOISY)
390                fire_when_done.callback(None)
391            else:
392                self._loop(fire_when_done)
393        def _err(f):
394            self.log(format="[%(si)s] ciphertext read failed",
395                     si=self._upload_id, failure=f, level=log.UNUSUAL)
396            fire_when_done.errback(f)
397        d.addCallbacks(_done, _err)
398        return None
399
400    def _fetch(self):
401        needed = self._expected_size - self._have
402        fetch_size = min(needed, self.CHUNK_SIZE)
403        if fetch_size == 0:
404            self._upload_helper._upload_status.set_progress(1, 1.0)
405            return True # all done
406        percent = 0.0
407        if self._expected_size:
408            percent = 1.0 * (self._have+fetch_size) / self._expected_size
409        self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
410                 si=self._upload_id,
411                 start=self._have,
412                 end=self._have+fetch_size,
413                 total=self._expected_size,
414                 percent=int(100.0*percent),
415                 level=log.NOISY)
416        d = self.call("read_encrypted", self._have, fetch_size)
417        def _got_data(ciphertext_v):
418            for data in ciphertext_v:
419                self._f.write(data)
420                self._have += len(data)
421                self._ciphertext_fetched += len(data)
422                self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
423                self._upload_helper._upload_status.set_progress(1, percent)
424            return False # not done
425        d.addCallback(_got_data)
426        return d
427
428    def _done(self, res):
429        self._f.close()
430        self._f = None
431        self.log(format="done fetching ciphertext, size=%(size)d",
432                 size=os.stat(self._incoming_file)[stat.ST_SIZE],
433                 level=log.NOISY)
434        os.rename(self._incoming_file, self._encoding_file)
435
436    def _done2(self, _ignored, started):
437        self.log("done2", level=log.NOISY)
438        elapsed = time.time() - started
439        self._times["total"] = elapsed
440        self._readers = []
441        self._done_observers.fire(None)
442
443    def _failed(self, f):
444        if self._f:
445            self._f.close()
446        self._readers = []
447        self._done_observers.fire(f)
448
449    def when_done(self):
450        return self._done_observers.when_fired()
451
452    def get_times(self):
453        return self._times
454
455    def get_ciphertext_fetched(self):
456        return self._ciphertext_fetched
457
458
459@implementer(interfaces.IEncryptedUploadable)
460class LocalCiphertextReader(AskUntilSuccessMixin):
461
462    def __init__(self, upload_helper, storage_index, encoding_file):
463        self._readers = []
464        self._upload_helper = upload_helper
465        self._storage_index = storage_index
466        self._encoding_file = encoding_file
467        self._status = None
468
469    def start(self):
470        self._upload_helper._upload_status.set_status("pushing")
471        self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
472        self.f = open(self._encoding_file, "rb")
473
474    def get_size(self):
475        return defer.succeed(self._size)
476
477    def get_all_encoding_parameters(self):
478        return self.call("get_all_encoding_parameters")
479
480    def get_storage_index(self):
481        return defer.succeed(self._storage_index)
482
483    def read_encrypted(self, length, hash_only):
484        assert hash_only is False
485        d = defer.maybeDeferred(self.f.read, length)
486        d.addCallback(lambda data: [data])
487        return d
488
489    def close(self):
490        self.f.close()
491        # ??. I'm not sure if it makes sense to forward the close message.
492        return self.call("close")
493
494    # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3561
495    def set_upload_status(self, upload_status):
496        raise NotImplementedError
497
498
499@implementer(interfaces.RIHelper, interfaces.IStatsProducer)
500class Helper(Referenceable):  # type: ignore # warner/foolscap#78
501    """
502    :ivar dict[bytes, CHKUploadHelper] _active_uploads: For any uploads which
503        have been started but not finished, a mapping from storage index to the
504        upload helper.
505
506    :ivar chk_checker: A callable which returns an object like a
507        CHKCheckerAndUEBFetcher instance which can check CHK shares.
508        Primarily for the convenience of tests to override.
509
510    :ivar chk_upload: A callable which returns an object like a
511        CHKUploadHelper instance which can upload CHK shares.  Primarily for
512        the convenience of tests to override.
513    """
514    # this is the non-distributed version. When we need to have multiple
515    # helpers, this object will become the HelperCoordinator, and will query
516    # the farm of Helpers to see if anyone has the storage_index of interest,
517    # and send the request off to them. If nobody has it, we'll choose a
518    # helper at random.
519
520    name = "helper"
521    VERSION = { b"http://allmydata.org/tahoe/protocols/helper/v1" :
522                 { },
523                b"application-version": allmydata.__full_version__.encode("utf-8"),
524                }
525    MAX_UPLOAD_STATUSES = 10
526
527    chk_checker = CHKCheckerAndUEBFetcher
528    chk_upload = CHKUploadHelper
529
530    def __init__(self, basedir, storage_broker, secret_holder,
531                 stats_provider, history):
532        self._basedir = basedir
533        self._storage_broker = storage_broker
534        self._secret_holder = secret_holder
535        self._chk_incoming = os.path.join(basedir, "CHK_incoming")
536        self._chk_encoding = os.path.join(basedir, "CHK_encoding")
537        fileutil.make_dirs(self._chk_incoming)
538        fileutil.make_dirs(self._chk_encoding)
539        self._active_uploads = {}
540        self._all_uploads = weakref.WeakKeyDictionary() # for debugging
541        self.stats_provider = stats_provider
542        if stats_provider:
543            stats_provider.register_producer(self)
544        self._counters = {"chk_upload_helper.upload_requests": 0,
545                          "chk_upload_helper.upload_already_present": 0,
546                          "chk_upload_helper.upload_need_upload": 0,
547                          "chk_upload_helper.resumes": 0,
548                          "chk_upload_helper.fetched_bytes": 0,
549                          "chk_upload_helper.encoded_bytes": 0,
550                          }
551        self._history = history
552
553    def log(self, *args, **kwargs):
554        if 'facility' not in kwargs:
555            kwargs['facility'] = "tahoe.helper"
556        return log.msg(*args, **kwargs)
557
558    def count(self, key, value=1):
559        if self.stats_provider:
560            self.stats_provider.count(key, value)
561        self._counters[key] += value
562
563    def get_stats(self):
564        OLD = 86400*2 # 48hours
565        now = time.time()
566        inc_count = inc_size = inc_size_old = 0
567        enc_count = enc_size = enc_size_old = 0
568        inc = os.listdir(self._chk_incoming)
569        enc = os.listdir(self._chk_encoding)
570        for f in inc:
571            s = os.stat(os.path.join(self._chk_incoming, f))
572            size = s[stat.ST_SIZE]
573            mtime = s[stat.ST_MTIME]
574            inc_count += 1
575            inc_size += size
576            if now - mtime > OLD:
577                inc_size_old += size
578        for f in enc:
579            s = os.stat(os.path.join(self._chk_encoding, f))
580            size = s[stat.ST_SIZE]
581            mtime = s[stat.ST_MTIME]
582            enc_count += 1
583            enc_size += size
584            if now - mtime > OLD:
585                enc_size_old += size
586        stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
587                  'chk_upload_helper.incoming_count': inc_count,
588                  'chk_upload_helper.incoming_size': inc_size,
589                  'chk_upload_helper.incoming_size_old': inc_size_old,
590                  'chk_upload_helper.encoding_count': enc_count,
591                  'chk_upload_helper.encoding_size': enc_size,
592                  'chk_upload_helper.encoding_size_old': enc_size_old,
593                  }
594        stats.update(self._counters)
595        return stats
596
597    def remote_get_version(self):
598        return self.VERSION
599
600    def remote_upload_chk(self, storage_index):
601        """
602        See ``RIHelper.upload_chk``
603        """
604        self.count("chk_upload_helper.upload_requests")
605        lp = self.log(format="helper: upload_chk query for SI %(si)s",
606                      si=si_b2a(storage_index))
607        if storage_index in self._active_uploads:
608            self.log("upload is currently active", parent=lp)
609            uh = self._active_uploads[storage_index]
610            return (None, uh)
611
612        d = self._check_chk(storage_index, lp)
613        d.addCallback(self._did_chk_check, storage_index, lp)
614        def _err(f):
615            self.log("error while checking for chk-already-in-grid",
616                     failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
617            return f
618        d.addErrback(_err)
619        return d
620
621    def _check_chk(self, storage_index, lp):
622        # see if this file is already in the grid
623        lp2 = self.log("doing a quick check+UEBfetch",
624                       parent=lp, level=log.NOISY)
625        sb = self._storage_broker
626        c = self.chk_checker(sb.get_servers_for_psi, storage_index, lp2)
627        d = c.check()
628        def _checked(res):
629            if res:
630                (sharemap, ueb_data, ueb_hash) = res
631                self.log("found file in grid", level=log.NOISY, parent=lp)
632                hur = upload.HelperUploadResults()
633                hur.uri_extension_hash = ueb_hash
634                hur.sharemap = sharemap
635                hur.uri_extension_data = ueb_data
636                hur.preexisting_shares = len(sharemap)
637                hur.pushed_shares = 0
638                return hur
639            return None
640        d.addCallback(_checked)
641        return d
642
643    def _did_chk_check(self, already_present, storage_index, lp):
644        if already_present:
645            # the necessary results are placed in the UploadResults
646            self.count("chk_upload_helper.upload_already_present")
647            self.log("file already found in grid", parent=lp)
648            return (already_present, None)
649
650        self.count("chk_upload_helper.upload_need_upload")
651        # the file is not present in the grid, by which we mean there are
652        # less than 'N' shares available.
653        self.log("unable to find file in the grid", parent=lp,
654                 level=log.NOISY)
655        # We need an upload helper. Check our active uploads again in
656        # case there was a race.
657        if storage_index in self._active_uploads:
658            self.log("upload is currently active", parent=lp)
659            uh = self._active_uploads[storage_index]
660        else:
661            self.log("creating new upload helper", parent=lp)
662            uh = self._make_chk_upload_helper(storage_index, lp)
663            self._active_uploads[storage_index] = uh
664            self._add_upload(uh)
665        return (None, uh)
666
667    def _make_chk_upload_helper(self, storage_index, lp):
668        si_s = si_b2a(storage_index).decode('ascii')
669        incoming_file = os.path.join(self._chk_incoming, si_s)
670        encoding_file = os.path.join(self._chk_encoding, si_s)
671        uh = self.chk_upload(
672            storage_index,
673            self,
674            self._storage_broker,
675            self._secret_holder,
676            incoming_file,
677            encoding_file,
678            lp,
679        )
680        return uh
681
682    def _add_upload(self, uh):
683        self._all_uploads[uh] = None
684        if self._history:
685            s = uh.get_upload_status()
686            self._history.notify_helper_upload(s)
687
688    def upload_finished(self, storage_index, size):
689        # this is called with size=0 if the upload failed
690        self.count("chk_upload_helper.encoded_bytes", size)
691        uh = self._active_uploads[storage_index]
692        del self._active_uploads[storage_index]
693        s = uh.get_upload_status()
694        s.set_active(False)
Note: See TracBrowser for help on using the repository browser.