""" Crawl the storage server shares. Ported to Python 3. """ import os import time import json import struct from twisted.internet import reactor from twisted.application import service from twisted.python.filepath import FilePath from allmydata.storage.common import si_b2a from allmydata.util import fileutil class TimeSliceExceeded(Exception): pass class MigratePickleFileError(Exception): """ A pickle-format file exists (the FilePath to the file will be the single arg). """ pass def _convert_cycle_data(state): """ :param dict state: cycle-to-date or history-item state :return dict: the state in the JSON form """ def _convert_expiration_mode(value): # original is a 4-tuple, with the last element being a 2-tuple # .. convert both to lists return [ value[0], value[1], value[2], list(value[3]), ] def _convert_lease_age(value): # if we're in cycle-to-date, this is a dict if isinstance(value, dict): return { "{},{}".format(k[0], k[1]): v for k, v in value.items() } # otherwise, it's a history-item and they're 3-tuples return [ list(v) for v in value ] converters = { "configured-expiration-mode": _convert_expiration_mode, "cycle-start-finish-times": list, "lease-age-histogram": _convert_lease_age, "corrupt-shares": lambda value: [ list(x) for x in value ], "leases-per-share-histogram": lambda value: { str(k): v for k, v in value.items() }, } return { k: converters.get(k, lambda z: z)(v) for k, v in state.items() } def _convert_pickle_state_to_json(state): """ :param dict state: the pickled state :return dict: the state in the JSON form """ assert state["version"] == 1, "Only known version is 1" converters = { "cycle-to-date": _convert_cycle_data, } return { k: converters.get(k, lambda x: x)(v) for k, v in state.items() } def _upgrade_pickle_to_json(state_path, convert_pickle): """ :param FilePath state_path: the filepath to ensure is json :param Callable[dict] convert_pickle: function to change pickle-style state into JSON-style state :returns FilePath: the local path where the state is stored If this state is pickle, convert to the JSON format and return the JSON path. """ json_state_path = state_path.siblingExtension(".json") # if there's no file there at all, we're done because there's # nothing to upgrade if not state_path.exists(): return json_state_path # upgrade the pickle data to JSON import pickle with state_path.open("rb") as f: state = pickle.load(f) new_state = convert_pickle(state) _dump_json_to_file(new_state, json_state_path) # we've written the JSON, delete the pickle state_path.remove() return json_state_path def _confirm_json_format(fp): """ :param FilePath fp: the original (pickle) name of a state file This confirms that we do _not_ have the pickle-version of a state-file and _do_ either have nothing, or the JSON version. If the pickle-version exists, an exception is raised. :returns FilePath: the JSON name of a state file """ if fp.path.endswith(".json"): return fp jsonfp = fp.siblingExtension(".json") if fp.exists(): raise MigratePickleFileError(fp) return jsonfp def _dump_json_to_file(js, afile): """ Dump the JSON object `js` to the FilePath `afile` """ with afile.open("wb") as f: data = json.dumps(js) f.write(data.encode("utf8")) class _LeaseStateSerializer(object): """ Read and write state for LeaseCheckingCrawler. This understands how to read the legacy pickle format files and upgrade them to the new JSON format (which will occur automatically). """ def __init__(self, state_path): self._path = _confirm_json_format(FilePath(state_path)) def load(self): """ :returns: deserialized JSON state """ with self._path.open("rb") as f: return json.load(f) def save(self, data): """ Serialize the given data as JSON into the state-path :returns: None """ tmpfile = self._path.siblingExtension(".tmp") _dump_json_to_file(data, tmpfile) fileutil.move_into_place(tmpfile.path, self._path.path) return None class ShareCrawler(service.MultiService): """A ShareCrawler subclass is attached to a StorageServer, and periodically walks all of its shares, processing each one in some fashion. This crawl is rate-limited, to reduce the IO burden on the host, since large servers can easily have a terabyte of shares, in several million files, which can take hours or days to read. Once the crawler starts a cycle, it will proceed at a rate limited by the allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor after it has worked for 'cpu_slice' seconds, and not resuming right away, always trying to use less than 'allowed_cpu_percentage'. Once the crawler finishes a cycle, it will put off starting the next one long enough to ensure that 'minimum_cycle_time' elapses between the start of two consecutive cycles. We assume that the normal upload/download/get_buckets traffic of a tahoe grid will cause the prefixdir contents to be mostly cached in the kernel, or that the number of buckets in each prefixdir will be small enough to load quickly. A 1TB allmydata.com server was measured to have 2.56M buckets, spread into the 1024 prefixdirs, with about 2500 buckets per prefix. On this server, each prefixdir took 130ms-200ms to list the first time, and 17ms to list the second time. To use a crawler, create a subclass which implements the process_bucket() method. It will be called with a prefixdir and a base32 storage index string. process_bucket() must run synchronously. Any keys added to self.state will be preserved. Override add_initial_state() to set up initial state keys. Override finished_cycle() to perform additional processing when the cycle is complete. Any status that the crawler produces should be put in the self.state dictionary. Status renderers (like a web page which describes the accomplishments of your crawler) will use crawler.get_state() to retrieve this dictionary; they can present the contents as they see fit. Then create an instance, with a reference to a StorageServer and a filename where it can store persistent state. The statefile is used to keep track of how far around the ring the process has travelled, as well as timing history to allow the pace to be predicted and controlled. The statefile will be updated and written to disk after each time slice (just before the crawler yields to the reactor), and also after each cycle is finished, and also when stopService() is called. Note that this means that a crawler which is interrupted with SIGKILL while it is in the middle of a time slice will lose progress: the next time the node is started, the crawler will repeat some unknown amount of work. The crawler instance must be started with startService() before it will do any work. To make it stop doing work, call stopService(). """ slow_start = 300 # don't start crawling for 5 minutes after startup # all three of these can be changed at any time allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average cpu_slice = 1.0 # use up to 1.0 seconds before yielding minimum_cycle_time = 300 # don't run a cycle faster than this def __init__(self, server, statefile, allowed_cpu_percentage=None): service.MultiService.__init__(self) if allowed_cpu_percentage is not None: self.allowed_cpu_percentage = allowed_cpu_percentage self.server = server self.sharedir = server.sharedir self._state_serializer = _LeaseStateSerializer(statefile) self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2] for i in range(2**10)] self.prefixes = [p.decode("ascii") for p in self.prefixes] self.prefixes.sort() self.timer = None self.bucket_cache = (None, []) self.current_sleep_time = None self.next_wake_time = None self.last_prefix_finished_time = None self.last_prefix_elapsed_time = None self.last_cycle_started_time = None self.last_cycle_elapsed_time = None self.load_state() def minus_or_none(self, a, b): if a is None: return None return a-b def get_progress(self): """I return information about how much progress the crawler is making. My return value is a dictionary. The primary key is 'cycle-in-progress': True if the crawler is currently traversing the shares, False if it is idle between cycles. Note that any of these 'time' keys could be None if I am called at certain moments, so application code must be prepared to tolerate this case. The estimates will also be None if insufficient data has been gatherered to form an estimate. If cycle-in-progress is True, the following keys will be present:: cycle-complete-percentage': float, from 0.0 to 100.0, indicating how far the crawler has progressed through the current cycle remaining-sleep-time: float, seconds from now when we do more work estimated-cycle-complete-time-left: float, seconds remaining until the current cycle is finished. TODO: this does not yet include the remaining time left in the current prefixdir, and it will be very inaccurate on fast crawlers (which can process a whole prefix in a single tick) estimated-time-per-cycle: float, seconds required to do a complete cycle If cycle-in-progress is False, the following keys are available:: next-crawl-time: float, seconds-since-epoch when next crawl starts remaining-wait-time: float, seconds from now when next crawl starts estimated-time-per-cycle: float, seconds required to do a complete cycle """ d = {} if self.state["current-cycle"] is None: d["cycle-in-progress"] = False d["next-crawl-time"] = self.next_wake_time d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time, time.time()) else: d["cycle-in-progress"] = True pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes) d["cycle-complete-percentage"] = pct remaining = None if self.last_prefix_elapsed_time is not None: left = len(self.prefixes) - self.last_complete_prefix_index remaining = left * self.last_prefix_elapsed_time # TODO: remainder of this prefix: we need to estimate the # per-bucket time, probably by measuring the time spent on # this prefix so far, divided by the number of buckets we've # processed. d["estimated-cycle-complete-time-left"] = remaining # it's possible to call get_progress() from inside a crawler's # finished_prefix() function d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time, time.time()) per_cycle = None if self.last_cycle_elapsed_time is not None: per_cycle = self.last_cycle_elapsed_time elif self.last_prefix_elapsed_time is not None: per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time d["estimated-time-per-cycle"] = per_cycle return d def get_state(self): """I return the current state of the crawler. This is a copy of my state dictionary. If we are not currently sleeping (i.e. get_state() was called from inside the process_prefixdir, process_bucket, or finished_cycle() methods, or if startService has not yet been called on this crawler), these two keys will be None. Subclasses can override this to add computed keys to the return value, but don't forget to start with the upcall. """ state = self.state.copy() # it isn't a deepcopy, so don't go crazy return state def load_state(self): # we use this to store state for both the crawler's internals and # anything the subclass-specific code needs. The state is stored # after each bucket is processed, after each prefixdir is processed, # and after a cycle is complete. The internal keys we use are: # ["version"]: int, always 1 # ["last-cycle-finished"]: int, or None if we have not yet finished # any cycle # ["current-cycle"]: int, or None if we are sleeping between cycles # ["current-cycle-start-time"]: int, seconds-since-epoch of when this # cycle was started, possibly by an earlier # process # ["last-complete-prefix"]: str, two-letter name of the last prefixdir # that was fully processed, or None if we # are sleeping between cycles, or if we # have not yet finished any prefixdir since # a cycle was started # ["last-complete-bucket"]: str, base32 storage index bucket name # of the last bucket to be processed, or # None if we are sleeping between cycles try: state = self._state_serializer.load() except Exception: state = {"version": 1, "last-cycle-finished": None, "current-cycle": None, "last-complete-prefix": None, "last-complete-bucket": None, } state.setdefault("current-cycle-start-time", time.time()) # approximate self.state = state lcp = state["last-complete-prefix"] if lcp == None: self.last_complete_prefix_index = -1 else: self.last_complete_prefix_index = self.prefixes.index(lcp) self.add_initial_state() def add_initial_state(self): """Hook method to add extra keys to self.state when first loaded. The first time this Crawler is used, or when the code has been upgraded, the saved state file may not contain all the keys you expect. Use this method to add any missing keys. Simply modify self.state as needed. This method for subclasses to override. No upcall is necessary. """ pass def save_state(self): lcpi = self.last_complete_prefix_index if lcpi == -1: last_complete_prefix = None else: last_complete_prefix = self.prefixes[lcpi] self.state["last-complete-prefix"] = last_complete_prefix self._state_serializer.save(self.get_state()) def startService(self): # arrange things to look like we were just sleeping, so # status/progress values work correctly self.sleeping_between_cycles = True self.current_sleep_time = self.slow_start self.next_wake_time = time.time() + self.slow_start self.timer = reactor.callLater(self.slow_start, self.start_slice) service.MultiService.startService(self) def stopService(self): if self.timer: self.timer.cancel() self.timer = None self.save_state() return service.MultiService.stopService(self) def start_slice(self): start_slice = time.time() self.timer = None self.sleeping_between_cycles = False self.current_sleep_time = None self.next_wake_time = None try: self.start_current_prefix(start_slice) finished_cycle = True except TimeSliceExceeded: finished_cycle = False self.save_state() if not self.running: # someone might have used stopService() to shut us down return # either we finished a whole cycle, or we ran out of time now = time.time() this_slice = now - start_slice # this_slice/(this_slice+sleep_time) = percentage # this_slice/percentage = this_slice+sleep_time # sleep_time = (this_slice/percentage) - this_slice sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice # if the math gets weird, or a timequake happens, don't sleep # forever. Note that this means that, while a cycle is running, we # will process at least one bucket every 5 minutes, no matter how # long that bucket takes. sleep_time = max(0.0, min(sleep_time, 299)) if finished_cycle: # how long should we sleep between cycles? Don't run faster than # allowed_cpu_percentage says, but also run faster than # minimum_cycle_time self.sleeping_between_cycles = True sleep_time = max(sleep_time, self.minimum_cycle_time) else: self.sleeping_between_cycles = False self.current_sleep_time = sleep_time # for status page self.next_wake_time = now + sleep_time self.yielding(sleep_time) self.timer = reactor.callLater(sleep_time, self.start_slice) def start_current_prefix(self, start_slice): state = self.state if state["current-cycle"] is None: self.last_cycle_started_time = time.time() state["current-cycle-start-time"] = self.last_cycle_started_time if state["last-cycle-finished"] is None: state["current-cycle"] = 0 else: state["current-cycle"] = state["last-cycle-finished"] + 1 self.started_cycle(state["current-cycle"]) cycle = state["current-cycle"] for i in range(self.last_complete_prefix_index+1, len(self.prefixes)): # if we want to yield earlier, just raise TimeSliceExceeded() prefix = self.prefixes[i] prefixdir = os.path.join(self.sharedir, prefix) if i == self.bucket_cache[0]: buckets = self.bucket_cache[1] else: try: buckets = os.listdir(prefixdir) buckets.sort() except EnvironmentError: buckets = [] self.bucket_cache = (i, buckets) self.process_prefixdir(cycle, prefix, prefixdir, buckets, start_slice) self.last_complete_prefix_index = i now = time.time() if self.last_prefix_finished_time is not None: elapsed = now - self.last_prefix_finished_time self.last_prefix_elapsed_time = elapsed self.last_prefix_finished_time = now self.finished_prefix(cycle, prefix) if time.time() >= start_slice + self.cpu_slice: raise TimeSliceExceeded() # yay! we finished the whole cycle self.last_complete_prefix_index = -1 self.last_prefix_finished_time = None # don't include the sleep now = time.time() if self.last_cycle_started_time is not None: self.last_cycle_elapsed_time = now - self.last_cycle_started_time state["last-complete-bucket"] = None state["last-cycle-finished"] = cycle state["current-cycle"] = None self.finished_cycle(cycle) self.save_state() def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): """This gets a list of bucket names (i.e. storage index strings, base32-encoded) in sorted order. You can override this if your crawler doesn't care about the actual shares, for example a crawler which merely keeps track of how many buckets are being managed by this server. Subclasses which *do* care about actual bucket should leave this method along, and implement process_bucket() instead. """ for bucket in buckets: last_complete = self.state["last-complete-bucket"] if last_complete is not None and bucket <= last_complete: continue self.process_bucket(cycle, prefix, prefixdir, bucket) self.state["last-complete-bucket"] = bucket if time.time() >= start_slice + self.cpu_slice: raise TimeSliceExceeded() # the remaining methods are explictly for subclasses to implement. def started_cycle(self, cycle): """Notify a subclass that the crawler is about to start a cycle. This method is for subclasses to override. No upcall is necessary. """ pass def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): """Examine a single bucket. Subclasses should do whatever they want to do to the shares therein, then update self.state as necessary. If the crawler is never interrupted by SIGKILL, this method will be called exactly once per share (per cycle). If it *is* interrupted, then the next time the node is started, some amount of work will be duplicated, according to when self.save_state() was last called. By default, save_state() is called at the end of each timeslice, and after finished_cycle() returns, and when stopService() is called. To reduce the chance of duplicate work (i.e. to avoid adding multiple records to a database), you can call save_state() at the end of your process_bucket() method. This will reduce the maximum duplicated work to one bucket per SIGKILL. It will also add overhead, probably 1-20ms per bucket (and some disk writes), which will count against your allowed_cpu_percentage, and which may be considerable if process_bucket() runs quickly. This method is for subclasses to override. No upcall is necessary. """ pass def finished_prefix(self, cycle, prefix): """Notify a subclass that the crawler has just finished processing a prefix directory (all buckets with the same two-character/10bit prefix). To impose a limit on how much work might be duplicated by a SIGKILL that occurs during a timeslice, you can call self.save_state() here, but be aware that it may represent a significant performance hit. This method is for subclasses to override. No upcall is necessary. """ pass def finished_cycle(self, cycle): """Notify subclass that a cycle (one complete traversal of all prefixdirs) has just finished. 'cycle' is the number of the cycle that just finished. This method should perform summary work and update self.state to publish information to status displays. One-shot crawlers, such as those used to upgrade shares to a new format or populate a database for the first time, can call self.stopService() (or more likely self.disownServiceParent()) to prevent it from running a second time. Don't forget to set some persistent state so that the upgrader won't be run again the next time the node is started. This method is for subclasses to override. No upcall is necessary. """ pass def yielding(self, sleep_time): """The crawler is about to sleep for 'sleep_time' seconds. This method is mostly for the convenience of unit tests. This method is for subclasses to override. No upcall is necessary. """ pass class BucketCountingCrawler(ShareCrawler): """I keep track of how many buckets are being managed by this server. This is equivalent to the number of distributed files and directories for which I am providing storage. The actual number of files+directories in the full grid is probably higher (especially when there are more servers than 'N', the number of generated shares), because some files+directories will have shares on other servers instead of me. Also note that the number of buckets will differ from the number of shares in small grids, when more than one share is placed on a single server. """ minimum_cycle_time = 60*60 # we don't need this more than once an hour def __init__(self, server, statefile, num_sample_prefixes=1): ShareCrawler.__init__(self, server, statefile) self.num_sample_prefixes = num_sample_prefixes def add_initial_state(self): # ["bucket-counts"][cyclenum][prefix] = number # ["last-complete-cycle"] = cyclenum # maintained by base class # ["last-complete-bucket-count"] = number # ["storage-index-samples"][prefix] = (cyclenum, # list of SI strings (base32)) self.state.setdefault("bucket-counts", {}) self.state.setdefault("last-complete-bucket-count", None) self.state.setdefault("storage-index-samples", {}) def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): # we override process_prefixdir() because we don't want to look at # the individual buckets. We'll save state after each one. On my # laptop, a mostly-empty storage server can process about 70 # prefixdirs in a 1.0s slice. if cycle not in self.state["bucket-counts"]: self.state["bucket-counts"][cycle] = {} self.state["bucket-counts"][cycle][prefix] = len(buckets) if prefix in self.prefixes[:self.num_sample_prefixes]: self.state["storage-index-samples"][prefix] = (cycle, buckets) def finished_cycle(self, cycle): last_counts = self.state["bucket-counts"].get(cycle, []) if len(last_counts) == len(self.prefixes): # great, we have a whole cycle. num_buckets = sum(last_counts.values()) self.state["last-complete-bucket-count"] = num_buckets # get rid of old counts for old_cycle in list(self.state["bucket-counts"].keys()): if old_cycle != cycle: del self.state["bucket-counts"][old_cycle] # get rid of old samples too for prefix in list(self.state["storage-index-samples"].keys()): old_cycle,buckets = self.state["storage-index-samples"][prefix] if old_cycle != cycle: del self.state["storage-index-samples"][prefix]