source: trunk/src/allmydata/immutable/downloader/status.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 9.0 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import itertools
6from zope.interface import implementer
7from allmydata.interfaces import IDownloadStatus
8
9class ReadEvent:
10
11    def __init__(self, ev, ds):
12        self._ev = ev
13        self._ds = ds
14
15    def update(self, bytes_returned, decrypttime, pausetime):
16        self._ev["bytes_returned"] += bytes_returned
17        self._ev["decrypt_time"] += decrypttime
18        self._ev["paused_time"] += pausetime
19
20    def finished(self, finishtime):
21        self._ev["finish_time"] = finishtime
22        self._ds.update_last_timestamp(finishtime)
23
24
25class SegmentEvent:
26
27    def __init__(self, ev, ds):
28        self._ev = ev
29        self._ds = ds
30
31    def activate(self, when):
32        if self._ev["active_time"] is None:
33            self._ev["active_time"] = when
34
35    def deliver(self, when, start, length, decodetime):
36        assert self._ev["active_time"] is not None
37        self._ev["finish_time"] = when
38        self._ev["success"] = True
39        self._ev["decode_time"] = decodetime
40        self._ev["segment_start"] = start
41        self._ev["segment_length"] = length
42        self._ds.update_last_timestamp(when)
43
44    def error(self, when):
45        self._ev["finish_time"] = when
46        self._ev["success"] = False
47        self._ds.update_last_timestamp(when)
48
49
50class DYHBEvent:
51
52    def __init__(self, ev, ds):
53        self._ev = ev
54        self._ds = ds
55
56    def error(self, when):
57        self._ev["finish_time"] = when
58        self._ev["success"] = False
59        self._ds.update_last_timestamp(when)
60
61    def finished(self, shnums, when):
62        self._ev["finish_time"] = when
63        self._ev["success"] = True
64        self._ev["response_shnums"] = shnums
65        self._ds.update_last_timestamp(when)
66
67
68class BlockRequestEvent:
69
70    def __init__(self, ev, ds):
71        self._ev = ev
72        self._ds = ds
73
74    def finished(self, received, when):
75        self._ev["finish_time"] = when
76        self._ev["success"] = True
77        self._ev["response_length"] = received
78        self._ds.update_last_timestamp(when)
79
80    def error(self, when):
81        self._ev["finish_time"] = when
82        self._ev["success"] = False
83        self._ds.update_last_timestamp(when)
84
85
86@implementer(IDownloadStatus)
87class DownloadStatus:
88    # There is one DownloadStatus for each CiphertextFileNode. The status
89    # object will keep track of all activity for that node.
90    statusid_counter = itertools.count(0)
91
92    def __init__(self, storage_index, size):
93        self.storage_index = storage_index
94        self.size = size
95        self.counter = next(self.statusid_counter)
96        self.helper = False
97
98        self.first_timestamp = None
99        self.last_timestamp = None
100
101        # all four of these _events lists are sorted by start_time, because
102        # they are strictly append-only (some elements are later mutated in
103        # place, but none are removed or inserted in the middle).
104
105        # self.read_events tracks read() requests. It is a list of dicts,
106        # each with the following keys:
107        #  start,length  (of data requested)
108        #  start_time
109        #  finish_time (None until finished)
110        #  bytes_returned (starts at 0, grows as segments are delivered)
111        #  decrypt_time (time spent in decrypt, None for ciphertext-only reads)
112        #  paused_time (time spent paused by client via pauseProducing)
113        self.read_events = []
114
115        # self.segment_events tracks segment requests and their resolution.
116        # It is a list of dicts:
117        #  segment_number
118        #  start_time
119        #  active_time (None until work has begun)
120        #  decode_time (time spent in decode, None until delievered)
121        #  finish_time (None until resolved)
122        #  success (None until resolved, then boolean)
123        #  segment_start (file offset of first byte, None until delivered)
124        #  segment_length (None until delivered)
125        self.segment_events = []
126
127        # self.dyhb_requests tracks "do you have a share" requests and
128        # responses. It is a list of dicts:
129        #  server (instance of IServer)
130        #  start_time
131        #  success (None until resolved, then boolean)
132        #  response_shnums (tuple, None until successful)
133        #  finish_time (None until resolved)
134        self.dyhb_requests = []
135
136        # self.block_requests tracks share-data requests and responses. It is
137        # a list of dicts:
138        #  server (instance of IServer)
139        #  shnum,
140        #  start,length,  (of data requested)
141        #  start_time
142        #  finish_time (None until resolved)
143        #  success (None until resolved, then bool)
144        #  response_length (None until success)
145        self.block_requests = []
146
147        self.known_shares = [] # (server, shnum)
148        self.problems = []
149
150        self.misc_events = []
151
152    def add_misc_event(self, what, start, finish=None):
153        self.misc_events.append( {"what": what,
154                                  "start_time": start,
155                                  "finish_time": finish,
156                                  } )
157
158    def add_read_event(self, start, length, when):
159        if self.first_timestamp is None:
160            self.first_timestamp = when
161        r = { "start": start,
162              "length": length,
163              "start_time": when,
164              "finish_time": None,
165              "bytes_returned": 0,
166              "decrypt_time": 0,
167              "paused_time": 0,
168              }
169        self.read_events.append(r)
170        return ReadEvent(r, self)
171
172    def add_segment_request(self, segnum, when):
173        if self.first_timestamp is None:
174            self.first_timestamp = when
175        r = { "segment_number": segnum,
176              "start_time": when,
177              "active_time": None,
178              "finish_time": None,
179              "success": None,
180              "decode_time": None,
181              "segment_start": None,
182              "segment_length": None,
183              }
184        self.segment_events.append(r)
185        return SegmentEvent(r, self)
186
187    def add_dyhb_request(self, server, when):
188        r = { "server": server,
189              "start_time": when,
190              "success": None,
191              "response_shnums": None,
192              "finish_time": None,
193              }
194        self.dyhb_requests.append(r)
195        return DYHBEvent(r, self)
196
197    def add_block_request(self, server, shnum, start, length, when):
198        r = { "server": server,
199              "shnum": shnum,
200              "start": start,
201              "length": length,
202              "start_time": when,
203              "finish_time": None,
204              "success": None,
205              "response_length": None,
206              }
207        self.block_requests.append(r)
208        return BlockRequestEvent(r, self)
209
210    def update_last_timestamp(self, when):
211        if self.last_timestamp is None or when > self.last_timestamp:
212            self.last_timestamp = when
213
214    def add_known_share(self, server, shnum): # XXX use me
215        self.known_shares.append( (server, shnum) )
216
217    def add_problem(self, p):
218        self.problems.append(p)
219
220    # IDownloadStatus methods
221    def get_counter(self):
222        return self.counter
223    def get_storage_index(self):
224        return self.storage_index
225    def get_size(self):
226        return self.size
227    def get_status(self):
228        # mention all outstanding segment requests
229        outstanding = set([s_ev["segment_number"]
230                           for s_ev in self.segment_events
231                           if s_ev["finish_time"] is None])
232        errorful = set([s_ev["segment_number"]
233                        for s_ev in self.segment_events
234                        if s_ev["success"] is False])
235        def join(segnums):
236            if len(segnums) == 1:
237                return "segment %s" % list(segnums)[0]
238            else:
239                return "segments %s" % (",".join([str(i)
240                                                  for i in sorted(segnums)]))
241        error_s = ""
242        if errorful:
243            error_s = "; errors on %s" % join(errorful)
244        if outstanding:
245            s = "fetching %s" % join(outstanding)
246        else:
247            s = "idle"
248        return s + error_s
249
250    def get_progress(self):
251        # measure all read events that aren't completely done, return the
252        # total percentage complete for them
253        if not self.read_events:
254            return 0.0
255        total_outstanding, total_received = 0, 0
256        for r_ev in self.read_events:
257            if r_ev["finish_time"] is None:
258                total_outstanding += r_ev["length"]
259                total_received += r_ev["bytes_returned"]
260            # else ignore completed requests
261        if not total_outstanding:
262            return 1.0
263        return total_received / total_outstanding
264
265    def using_helper(self):
266        return False
267
268    def get_active(self):
269        # a download is considered active if it has at least one outstanding
270        # read() call
271        for r_ev in self.read_events:
272            if r_ev["finish_time"] is None:
273                return True
274        return False
275
276    def get_started(self):
277        return self.first_timestamp
278    def get_results(self):
279        return None # TODO
Note: See TracBrowser for help on using the repository browser.