# -*- python -*- """ Run this tool with twistd in its own directory, with a file named 'urls.txt' describing which nodes to query. Make sure to copy diskwatcher.py into the same directory. It will request disk-usage numbers from the nodes once per hour (or slower), and store them in a local database. It will compute usage-per-unit time values over several time ranges and make them available through an HTTP query (using ./webport). It will also provide an estimate of how much time is left before the grid's storage is exhausted. There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph the values this tool computes. Each line of urls.txt points to a single node. Each node should have its own dedicated disk: if multiple nodes share a disk, only list one of them in urls.txt (otherwise that space will be double-counted, confusing the results). Each line should be in the form: http://host:webport/statistics?t=json """ # TODO: # built-in graphs on web interface import os.path, urllib, time from datetime import timedelta from twisted.application import internet, service, strports from twisted.web import server, resource, http, client from twisted.internet import defer from twisted.python import log import json from axiom.attributes import AND from axiom.store import Store from epsilon import extime from diskwatcher import Sample #from axiom.item import Item #from axiom.attributes import text, integer, timestamp #class Sample(Item): # url = text() # when = timestamp() # used = integer() # avail = integer() #s = Store("history.axiom") #ns = Store("new-history.axiom") #for sa in s.query(Sample): # diskwatcher.Sample(store=ns, # url=sa.url, when=sa.when, used=sa.used, avail=sa.avail) #print "done" HOUR = 3600 DAY = 24*3600 WEEK = 7*DAY MONTH = 30*DAY YEAR = 365*DAY class DiskWatcher(service.MultiService, resource.Resource): POLL_INTERVAL = 1*HOUR AVERAGES = {#"60s": 60, #"5m": 5*60, #"30m": 30*60, "1hr": 1*HOUR, "1day": 1*DAY, "2wk": 2*WEEK, "4wk": 4*WEEK, } def __init__(self): assert os.path.exists("diskwatcher.tac") # run from the right directory self.growth_cache = {} service.MultiService.__init__(self) resource.Resource.__init__(self) self.store = Store("history.axiom") self.store.whenFullyUpgraded().addCallback(self._upgrade_complete) service.IService(self.store).setServiceParent(self) # let upgrader run ts = internet.TimerService(self.POLL_INTERVAL, self.poll) ts.setServiceParent(self) def _upgrade_complete(self, ignored): print("Axiom store upgrade complete") def startService(self): service.MultiService.startService(self) try: desired_webport = open("webport", "r").read().strip() except EnvironmentError: desired_webport = None webport = desired_webport or "tcp:0" root = self serv = strports.service(webport, server.Site(root)) serv.setServiceParent(self) if not desired_webport: got_port = serv._port.getHost().port open("webport", "w").write("tcp:%d\n" % got_port) def get_urls(self): for url in open("urls.txt","r").readlines(): if "#" in url: url = url[:url.find("#")] url = url.strip() if not url: continue yield url def poll(self): log.msg("polling..") #return self.poll_synchronous() return self.poll_asynchronous() def poll_asynchronous(self): # this didn't actually seem to work any better than poll_synchronous: # logs are more noisy, and I got frequent DNS failures. But with a # lot of servers to query, this is probably the better way to go. A # significant advantage of this approach is that we can use a # timeout= argument to tolerate hanging servers. dl = [] for url in self.get_urls(): when = extime.Time() d = client.getPage(url, timeout=60) d.addCallback(self.got_response, when, url) dl.append(d) d = defer.DeferredList(dl) def _done(res): fetched = len([1 for (success, value) in res if success]) log.msg("fetched %d of %d" % (fetched, len(dl))) d.addCallback(_done) return d def poll_synchronous(self): attempts = 0 fetched = 0 for url in self.get_urls(): attempts += 1 try: when = extime.Time() # if a server accepts the connection and then hangs, this # will block forever data_json = urllib.urlopen(url).read() self.got_response(data_json, when, url) fetched += 1 except: log.msg("error while fetching: %s" % url) log.err() log.msg("fetched %d of %d" % (fetched, attempts)) def got_response(self, data_json, when, url): data = json.loads(data_json) total = data[u"stats"][u"storage_server.disk_total"] used = data[u"stats"][u"storage_server.disk_used"] avail = data[u"stats"][u"storage_server.disk_avail"] print("%s : total=%s, used=%s, avail=%s" % (url, total, used, avail)) Sample(store=self.store, url=unicode(url), when=when, total=total, used=used, avail=avail) def calculate_growth_timeleft(self): timespans = [] total_avail_space = self.find_total_available_space() pairs = [ (timespan,name) for name,timespan in self.AVERAGES.items() ] pairs.sort() for (timespan,name) in pairs: growth = self.growth(timespan) print(name, total_avail_space, growth) if growth is not None: timeleft = None if growth > 0: timeleft = total_avail_space / growth timespans.append( (name, timespan, growth, timeleft) ) return timespans def find_total_space(self): # this returns the sum of disk-avail stats for all servers that 1) # are listed in urls.txt and 2) have responded recently. now = extime.Time() recent = now - timedelta(seconds=2*self.POLL_INTERVAL) total_space = 0 for url in self.get_urls(): url = unicode(url) latest = list(self.store.query(Sample, AND(Sample.url == url, Sample.when > recent), sort=Sample.when.descending, limit=1)) if latest: total_space += latest[0].total return total_space def find_total_available_space(self): # this returns the sum of disk-avail stats for all servers that 1) # are listed in urls.txt and 2) have responded recently. now = extime.Time() recent = now - timedelta(seconds=2*self.POLL_INTERVAL) total_avail_space = 0 for url in self.get_urls(): url = unicode(url) latest = list(self.store.query(Sample, AND(Sample.url == url, Sample.when > recent), sort=Sample.when.descending, limit=1)) if latest: total_avail_space += latest[0].avail return total_avail_space def find_total_used_space(self): # this returns the sum of disk-used stats for all servers that 1) are # listed in urls.txt and 2) have responded recently. now = extime.Time() recent = now - timedelta(seconds=2*self.POLL_INTERVAL) total_used_space = 0 for url in self.get_urls(): url = unicode(url) latest = list(self.store.query(Sample, AND(Sample.url == url, Sample.when > recent), sort=Sample.when.descending, limit=1)) if latest: total_used_space += latest[0].used return total_used_space def growth(self, timespan): """Calculate the bytes-per-second growth of the total disk-used stat, over a period of TIMESPAN seconds (i.e. between the most recent sample and the latest one that's at least TIMESPAN seconds ago), summed over all nodes which 1) are listed in urls.txt, 2) have responded recently, and 3) have a response at least as old as TIMESPAN. If there are no nodes which meet these criteria, we'll return None; this is likely to happen for the longer timespans (4wk) until the gatherer has been running and collecting data for that long.""" # a note about workload: for our oldest storage servers, as of # 25-Jan-2009, the first DB query here takes about 40ms per server # URL (some take as little as 10ms). There are about 110 servers, and # two queries each, so the growth() function takes about 7s to run # for each timespan. We track 4 timespans, and find_total_*_space() # takes about 2.3s to run, so calculate_growth_timeleft() takes about # 27s. Each HTTP query thus takes 27s, and we have six munin plugins # which perform HTTP queries every 5 minutes. By adding growth_cache(), # I hope to reduce this: the first HTTP query will still take 27s, # but the subsequent five should be about 2.3s each. # we're allowed to cache this value for 3 minutes if timespan in self.growth_cache: (when, value) = self.growth_cache[timespan] if time.time() - when < 3*60: return value td = timedelta(seconds=timespan) now = extime.Time() then = now - td recent = now - timedelta(seconds=2*self.POLL_INTERVAL) total_growth = 0.0 num_nodes = 0 for url in self.get_urls(): url = unicode(url) latest = list(self.store.query(Sample, AND(Sample.url == url, Sample.when > recent), sort=Sample.when.descending, limit=1)) if not latest: #print "no latest sample from", url continue # skip this node latest = latest[0] old = list(self.store.query(Sample, AND(Sample.url == url, Sample.when < then), sort=Sample.when.descending, limit=1)) if not old: #print "no old sample from", url continue # skip this node old = old[0] duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp() if not duration: print("only one sample from", url) continue rate = float(latest.used - old.used) / duration #print url, rate total_growth += rate num_nodes += 1 if not num_nodes: return None self.growth_cache[timespan] = (time.time(), total_growth) return total_growth def getChild(self, path, req): if path == "": return self return resource.Resource.getChild(self, path, req) def abbreviate_time(self, s): def _plural(count, unit): count = int(count) if count == 1: return "%d %s" % (count, unit) return "%d %ss" % (count, unit) if s is None: return "unknown" if s < 120: return _plural(s, "second") if s < 3*HOUR: return _plural(s/60, "minute") if s < 2*DAY: return _plural(s/HOUR, "hour") if s < 2*MONTH: return _plural(s/DAY, "day") if s < 4*YEAR: return _plural(s/MONTH, "month") return _plural(s/YEAR, "year") def abbreviate_space2(self, s, SI=True): if s is None: return "unknown" if SI: U = 1000.0 isuffix = "B" else: U = 1024.0 isuffix = "iB" def r(count, suffix): return "%.2f %s%s" % (count, suffix, isuffix) if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode return r(s, "") if s < U*U: return r(s/U, "k") if s < U*U*U: return r(s/(U*U), "M") if s < U*U*U*U: return r(s/(U*U*U), "G") if s < U*U*U*U*U: return r(s/(U*U*U*U), "T") return r(s/(U*U*U*U*U), "P") def abbreviate_space(self, s): return "(%s, %s)" % (self.abbreviate_space2(s, True), self.abbreviate_space2(s, False)) def render(self, req): t = req.args.get("t", ["html"])[0] ctype = "text/plain" data = "" if t == "html": data = "" for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft(): data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \ (growth, self.abbreviate_space2(growth, True), self.abbreviate_time(timeleft), name) used = self.find_total_used_space() data += "total used: %d bytes %s\n" % (used, self.abbreviate_space(used)) total = self.find_total_space() data += "total space: %d bytes %s\n" % (total, self.abbreviate_space(total)) elif t == "json": current = {"rates": self.calculate_growth_timeleft(), "total": self.find_total_space(), "used": self.find_total_used_space(), "available": self.find_total_available_space(), } data = json.dumps(current, indent=True) else: req.setResponseCode(http.BAD_REQUEST) data = "Unknown t= %s\n" % t req.setHeader("content-type", ctype) return data application = service.Application("disk-watcher") DiskWatcher().setServiceParent(application)