source: trunk/misc/operations_helpers/spacetime/diskwatcher.tac

Last change on this file was b856238, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-15T15:53:34Z

remove old Python2 future statements

  • Property mode set to 100644
File size: 14.6 KB
Line 
1# -*- python -*-
2
3
4"""
5Run this tool with twistd in its own directory, with a file named 'urls.txt'
6describing which nodes to query. Make sure to copy diskwatcher.py into the
7same directory. It will request disk-usage numbers from the nodes once per
8hour (or slower), and store them in a local database. It will compute
9usage-per-unit time values over several time ranges and make them available
10through an HTTP query (using ./webport). It will also provide an estimate of
11how much time is left before the grid's storage is exhausted.
12
13There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph
14the values this tool computes.
15
16Each line of urls.txt points to a single node. Each node should have its own
17dedicated disk: if multiple nodes share a disk, only list one of them in
18urls.txt (otherwise that space will be double-counted, confusing the
19results). Each line should be in the form:
20
21 http://host:webport/statistics?t=json
22
23"""
24
25# TODO:
26#  built-in graphs on web interface
27
28
29import os.path, urllib, time
30from datetime import timedelta
31from twisted.application import internet, service, strports
32from twisted.web import server, resource, http, client
33from twisted.internet import defer
34from twisted.python import log
35import json
36from axiom.attributes import AND
37from axiom.store import Store
38from epsilon import extime
39from diskwatcher import Sample
40
41#from axiom.item import Item
42#from axiom.attributes import text, integer, timestamp
43
44#class Sample(Item):
45#    url = text()
46#    when = timestamp()
47#    used = integer()
48#    avail = integer()
49
50#s = Store("history.axiom")
51#ns = Store("new-history.axiom")
52#for sa in s.query(Sample):
53#    diskwatcher.Sample(store=ns,
54#                       url=sa.url, when=sa.when, used=sa.used, avail=sa.avail)
55#print "done"
56
57HOUR = 3600
58DAY = 24*3600
59WEEK = 7*DAY
60MONTH = 30*DAY
61YEAR = 365*DAY
62
63class DiskWatcher(service.MultiService, resource.Resource):
64    POLL_INTERVAL = 1*HOUR
65    AVERAGES = {#"60s": 60,
66                #"5m": 5*60,
67                #"30m": 30*60,
68                "1hr": 1*HOUR,
69                "1day": 1*DAY,
70                "2wk": 2*WEEK,
71                "4wk": 4*WEEK,
72                }
73
74    def __init__(self):
75        assert os.path.exists("diskwatcher.tac") # run from the right directory
76        self.growth_cache = {}
77        service.MultiService.__init__(self)
78        resource.Resource.__init__(self)
79        self.store = Store("history.axiom")
80        self.store.whenFullyUpgraded().addCallback(self._upgrade_complete)
81        service.IService(self.store).setServiceParent(self) # let upgrader run
82        ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
83        ts.setServiceParent(self)
84
85    def _upgrade_complete(self, ignored):
86        print("Axiom store upgrade complete")
87
88    def startService(self):
89        service.MultiService.startService(self)
90
91        try:
92            desired_webport = open("webport", "r").read().strip()
93        except EnvironmentError:
94            desired_webport = None
95        webport = desired_webport or "tcp:0"
96        root = self
97        serv = strports.service(webport, server.Site(root))
98        serv.setServiceParent(self)
99        if not desired_webport:
100            got_port = serv._port.getHost().port
101            open("webport", "w").write("tcp:%d\n" % got_port)
102
103
104    def get_urls(self):
105        for url in open("urls.txt","r").readlines():
106            if "#" in url:
107                url = url[:url.find("#")]
108            url = url.strip()
109            if not url:
110                continue
111            yield url
112
113    def poll(self):
114        log.msg("polling..")
115        #return self.poll_synchronous()
116        return self.poll_asynchronous()
117
118    def poll_asynchronous(self):
119        # this didn't actually seem to work any better than poll_synchronous:
120        # logs are more noisy, and I got frequent DNS failures. But with a
121        # lot of servers to query, this is probably the better way to go. A
122        # significant advantage of this approach is that we can use a
123        # timeout= argument to tolerate hanging servers.
124        dl = []
125        for url in self.get_urls():
126            when = extime.Time()
127            d = client.getPage(url, timeout=60)
128            d.addCallback(self.got_response, when, url)
129            dl.append(d)
130        d = defer.DeferredList(dl)
131        def _done(res):
132            fetched = len([1 for (success, value) in res if success])
133            log.msg("fetched %d of %d" % (fetched, len(dl)))
134        d.addCallback(_done)
135        return d
136
137    def poll_synchronous(self):
138        attempts = 0
139        fetched = 0
140        for url in self.get_urls():
141            attempts += 1
142            try:
143                when = extime.Time()
144                # if a server accepts the connection and then hangs, this
145                # will block forever
146                data_json = urllib.urlopen(url).read()
147                self.got_response(data_json, when, url)
148                fetched += 1
149            except:
150                log.msg("error while fetching: %s" % url)
151                log.err()
152        log.msg("fetched %d of %d" % (fetched, attempts))
153
154    def got_response(self, data_json, when, url):
155        data = json.loads(data_json)
156        total = data[u"stats"][u"storage_server.disk_total"]
157        used = data[u"stats"][u"storage_server.disk_used"]
158        avail = data[u"stats"][u"storage_server.disk_avail"]
159        print("%s : total=%s, used=%s, avail=%s" % (url,
160                                                    total, used, avail))
161        Sample(store=self.store,
162               url=unicode(url), when=when, total=total, used=used, avail=avail)
163
164    def calculate_growth_timeleft(self):
165        timespans = []
166        total_avail_space = self.find_total_available_space()
167        pairs = [ (timespan,name)
168                  for name,timespan in self.AVERAGES.items() ]
169        pairs.sort()
170        for (timespan,name) in pairs:
171            growth = self.growth(timespan)
172            print(name, total_avail_space, growth)
173            if growth is not None:
174                timeleft = None
175                if growth > 0:
176                    timeleft = total_avail_space / growth
177                timespans.append( (name, timespan, growth, timeleft) )
178        return timespans
179
180    def find_total_space(self):
181        # this returns the sum of disk-avail stats for all servers that 1)
182        # are listed in urls.txt and 2) have responded recently.
183        now = extime.Time()
184        recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
185        total_space = 0
186        for url in self.get_urls():
187            url = unicode(url)
188            latest = list(self.store.query(Sample,
189                                           AND(Sample.url == url,
190                                               Sample.when > recent),
191                                           sort=Sample.when.descending,
192                                           limit=1))
193            if latest:
194                total_space += latest[0].total
195        return total_space
196
197    def find_total_available_space(self):
198        # this returns the sum of disk-avail stats for all servers that 1)
199        # are listed in urls.txt and 2) have responded recently.
200        now = extime.Time()
201        recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
202        total_avail_space = 0
203        for url in self.get_urls():
204            url = unicode(url)
205            latest = list(self.store.query(Sample,
206                                           AND(Sample.url == url,
207                                               Sample.when > recent),
208                                           sort=Sample.when.descending,
209                                           limit=1))
210            if latest:
211                total_avail_space += latest[0].avail
212        return total_avail_space
213
214    def find_total_used_space(self):
215        # this returns the sum of disk-used stats for all servers that 1) are
216        # listed in urls.txt and 2) have responded recently.
217        now = extime.Time()
218        recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
219        total_used_space = 0
220        for url in self.get_urls():
221            url = unicode(url)
222            latest = list(self.store.query(Sample,
223                                           AND(Sample.url == url,
224                                               Sample.when > recent),
225                                           sort=Sample.when.descending,
226                                           limit=1))
227            if latest:
228                total_used_space += latest[0].used
229        return total_used_space
230
231
232    def growth(self, timespan):
233        """Calculate the bytes-per-second growth of the total disk-used stat,
234        over a period of TIMESPAN seconds (i.e. between the most recent
235        sample and the latest one that's at least TIMESPAN seconds ago),
236        summed over all nodes which 1) are listed in urls.txt, 2) have
237        responded recently, and 3) have a response at least as old as
238        TIMESPAN. If there are no nodes which meet these criteria, we'll
239        return None; this is likely to happen for the longer timespans (4wk)
240        until the gatherer has been running and collecting data for that
241        long."""
242
243        # a note about workload: for our oldest storage servers, as of
244        # 25-Jan-2009, the first DB query here takes about 40ms per server
245        # URL (some take as little as 10ms). There are about 110 servers, and
246        # two queries each, so the growth() function takes about 7s to run
247        # for each timespan. We track 4 timespans, and find_total_*_space()
248        # takes about 2.3s to run, so calculate_growth_timeleft() takes about
249        # 27s. Each HTTP query thus takes 27s, and we have six munin plugins
250        # which perform HTTP queries every 5 minutes. By adding growth_cache(),
251        # I hope to reduce this: the first HTTP query will still take 27s,
252        # but the subsequent five should be about 2.3s each.
253
254        # we're allowed to cache this value for 3 minutes
255        if timespan in self.growth_cache:
256            (when, value) = self.growth_cache[timespan]
257            if time.time() - when < 3*60:
258                return value
259
260        td = timedelta(seconds=timespan)
261        now = extime.Time()
262        then = now - td
263        recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
264
265        total_growth = 0.0
266        num_nodes = 0
267
268        for url in self.get_urls():
269            url = unicode(url)
270            latest = list(self.store.query(Sample,
271                                           AND(Sample.url == url,
272                                               Sample.when > recent),
273                                           sort=Sample.when.descending,
274                                           limit=1))
275            if not latest:
276                #print "no latest sample from", url
277                continue # skip this node
278            latest = latest[0]
279            old = list(self.store.query(Sample,
280                                        AND(Sample.url == url,
281                                            Sample.when < then),
282                                        sort=Sample.when.descending,
283                                        limit=1))
284            if not old:
285                #print "no old sample from", url
286                continue # skip this node
287            old = old[0]
288            duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp()
289            if not duration:
290                print("only one sample from", url)
291                continue
292
293            rate = float(latest.used - old.used) / duration
294            #print url, rate
295            total_growth += rate
296            num_nodes += 1
297
298        if not num_nodes:
299            return None
300        self.growth_cache[timespan] = (time.time(), total_growth)
301        return total_growth
302
303    def getChild(self, path, req):
304        if path == "":
305            return self
306        return resource.Resource.getChild(self, path, req)
307
308    def abbreviate_time(self, s):
309        def _plural(count, unit):
310            count = int(count)
311            if count == 1:
312                return "%d %s" % (count, unit)
313            return "%d %ss" % (count, unit)
314        if s is None:
315            return "unknown"
316        if s < 120:
317            return _plural(s, "second")
318        if s < 3*HOUR:
319            return _plural(s/60, "minute")
320        if s < 2*DAY:
321            return _plural(s/HOUR, "hour")
322        if s < 2*MONTH:
323            return _plural(s/DAY, "day")
324        if s < 4*YEAR:
325            return _plural(s/MONTH, "month")
326        return _plural(s/YEAR, "year")
327
328    def abbreviate_space2(self, s, SI=True):
329        if s is None:
330            return "unknown"
331        if SI:
332            U = 1000.0
333            isuffix = "B"
334        else:
335            U = 1024.0
336            isuffix = "iB"
337        def r(count, suffix):
338            return "%.2f %s%s" % (count, suffix, isuffix)
339
340        if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode
341            return r(s, "")
342        if s < U*U:
343            return r(s/U, "k")
344        if s < U*U*U:
345            return r(s/(U*U), "M")
346        if s < U*U*U*U:
347            return r(s/(U*U*U), "G")
348        if s < U*U*U*U*U:
349            return r(s/(U*U*U*U), "T")
350        return r(s/(U*U*U*U*U), "P")
351
352    def abbreviate_space(self, s):
353        return "(%s, %s)" % (self.abbreviate_space2(s, True),
354                             self.abbreviate_space2(s, False))
355
356    def render(self, req):
357        t = req.args.get("t", ["html"])[0]
358        ctype = "text/plain"
359        data = ""
360        if t == "html":
361            data = ""
362            for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft():
363                data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \
364                        (growth, self.abbreviate_space2(growth, True),
365                         self.abbreviate_time(timeleft), name)
366            used = self.find_total_used_space()
367            data += "total used: %d bytes %s\n" % (used,
368                                                   self.abbreviate_space(used))
369            total = self.find_total_space()
370            data += "total space: %d bytes %s\n" % (total,
371                                                    self.abbreviate_space(total))
372        elif t == "json":
373            current = {"rates": self.calculate_growth_timeleft(),
374                       "total": self.find_total_space(),
375                       "used": self.find_total_used_space(),
376                       "available": self.find_total_available_space(),
377                       }
378            data = json.dumps(current, indent=True)
379        else:
380            req.setResponseCode(http.BAD_REQUEST)
381            data = "Unknown t= %s\n" % t
382        req.setHeader("content-type", ctype)
383        return data
384
385application = service.Application("disk-watcher")
386DiskWatcher().setServiceParent(application)
Note: See TracBrowser for help on using the repository browser.