Ticket #1363: 1363-patch1.diff

File 1363-patch1.diff, 39.5 KB (added by warner, at 2011-02-20T21:15:00Z)

first refactoring step

  • src/allmydata/client.py

    diff --git a/src/allmydata/client.py b/src/allmydata/client.py
    index fa515d4..fb7e0c8 100644
    a b class Client(node.Node, pollmixin.PollMixin): 
    468468        temporary test network and need to know when it is safe to proceed
    469469        with an upload or download."""
    470470        def _check():
    471             return len(self.storage_broker.get_all_servers()) >= num_clients
     471            return len(self.storage_broker.get_connected_servers()) >= num_clients
    472472        d = self.poll(_check, 0.5)
    473473        d.addCallback(lambda res: None)
    474474        return d
  • src/allmydata/control.py

    diff --git a/src/allmydata/control.py b/src/allmydata/control.py
    index cb0c84f..045a34e 100644
    a b class ControlServer(Referenceable, service.Service): 
    9191        # 300ms.
    9292        results = {}
    9393        sb = self.parent.get_storage_broker()
    94         everyone = sb.get_all_servers()
     94        everyone = sb.get_connected_servers()
    9595        num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
    9696        everyone = list(everyone) * num_pings
    9797        d = self._do_one_ping(None, everyone, results)
    class ControlServer(Referenceable, service.Service): 
    9999    def _do_one_ping(self, res, everyone_left, results):
    100100        if not everyone_left:
    101101            return results
    102         peerid, connection = everyone_left.pop(0)
     102        server = everyone_left.pop(0)
     103        peerid = server.get_serverid()
     104        connection = server.get_rref()
    103105        start = time.time()
    104106        d = connection.callRemote("get_buckets", "\x00"*16)
    105107        def _done(ignored):
  • src/allmydata/immutable/checker.py

    diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py
    index cd5c556..f350029 100644
    a b class Checker(log.PrefixingLogMixin): 
    463463    def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
    464464                 monitor):
    465465        assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
    466         assert precondition(isinstance(servers, (set, frozenset)), servers)
    467         for (serverid, serverrref) in servers:
    468             assert precondition(isinstance(serverid, str))
    469466
    470467        prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
    471468        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
    class Checker(log.PrefixingLogMixin): 
    489486    def _get_cancel_secret(self, peerid):
    490487        return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
    491488
    492     def _get_buckets(self, server, storageindex, serverid):
     489    def _get_buckets(self, s, storageindex):
    493490        """Return a deferred that eventually fires with ({sharenum: bucket},
    494491        serverid, success). In case the server is disconnected or returns a
    495492        Failure then it fires with ({}, serverid, False) (A server
    class Checker(log.PrefixingLogMixin): 
    498495        that we want to track and report whether or not each server
    499496        responded.)"""
    500497
     498        rref = s.get_rref()
     499        serverid = s.get_serverid()
    501500        if self._add_lease:
    502501            renew_secret = self._get_renewal_secret(serverid)
    503502            cancel_secret = self._get_cancel_secret(serverid)
    504             d2 = server.callRemote("add_lease", storageindex,
    505                                    renew_secret, cancel_secret)
     503            d2 = rref.callRemote("add_lease", storageindex,
     504                                 renew_secret, cancel_secret)
    506505            d2.addErrback(self._add_lease_failed, serverid, storageindex)
    507506
    508         d = server.callRemote("get_buckets", storageindex)
     507        d = rref.callRemote("get_buckets", storageindex)
    509508        def _wrap_results(res):
    510509            return (res, serverid, True)
    511510
    class Checker(log.PrefixingLogMixin): 
    656655
    657656        return d
    658657
    659     def _verify_server_shares(self, serverid, ss):
     658    def _verify_server_shares(self, s):
    660659        """ Return a deferred which eventually fires with a tuple of
    661660        (set(sharenum), serverid, set(corruptsharenum),
    662661        set(incompatiblesharenum), success) showing all the shares verified
    class Checker(log.PrefixingLogMixin): 
    679678        then disconnected and ceased responding, or returned a failure, it is
    680679        still marked with the True flag for 'success'.
    681680        """
    682         d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid)
     681        d = self._get_buckets(s, self._verifycap.get_storage_index())
    683682
    684683        def _got_buckets(result):
    685684            bucketdict, serverid, success = result
    class Checker(log.PrefixingLogMixin): 
    710709
    711710        def _err(f):
    712711            f.trap(RemoteException, DeadReferenceError)
    713             return (set(), serverid, set(), set(), False)
     712            return (set(), s.get_serverid(), set(), set(), False)
    714713
    715714        d.addCallbacks(_got_buckets, _err)
    716715        return d
    717716
    718     def _check_server_shares(self, serverid, ss):
     717    def _check_server_shares(self, s):
    719718        """Return a deferred which eventually fires with a tuple of
    720719        (set(sharenum), serverid, set(), set(), responded) showing all the
    721720        shares claimed to be served by this server. In case the server is
    class Checker(log.PrefixingLogMixin): 
    726725        def _curry_empty_corrupted(res):
    727726            buckets, serverid, responded = res
    728727            return (set(buckets), serverid, set(), set(), responded)
    729         d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid)
     728        d = self._get_buckets(s, self._verifycap.get_storage_index())
    730729        d.addCallback(_curry_empty_corrupted)
    731730        return d
    732731
    class Checker(log.PrefixingLogMixin): 
    794793    def start(self):
    795794        ds = []
    796795        if self._verify:
    797             for (serverid, ss) in self._servers:
    798                 ds.append(self._verify_server_shares(serverid, ss))
     796            for s in self._servers:
     797                ds.append(self._verify_server_shares(s))
    799798        else:
    800             for (serverid, ss) in self._servers:
    801                 ds.append(self._check_server_shares(serverid, ss))
     799            for s in self._servers:
     800                ds.append(self._check_server_shares(s))
    802801
    803802        return deferredutil.gatherResults(ds).addCallback(self._format_results)
  • src/allmydata/immutable/downloader/finder.py

    diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py
    index 4816ccd..f1142e7 100644
    a b class ShareFinder: 
    6262        # test_dirnode, which creates us with storage_broker=None
    6363        if not self._started:
    6464            si = self.verifycap.storage_index
    65             s = self._storage_broker.get_servers_for_index(si)
    66             self._servers = iter(s)
     65            servers = [(s.get_serverid(), s.get_rref())
     66                       for s in self._storage_broker.get_servers_for_psi(si)]
     67            self._servers = iter(servers)
    6768            self._started = True
    6869
    6970    def log(self, *args, **kwargs):
  • src/allmydata/immutable/filenode.py

    diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py
    index ed3785b..8fc4772 100644
    a b class CiphertextFileNode: 
    102102        verifycap = self._verifycap
    103103        storage_index = verifycap.storage_index
    104104        sb = self._storage_broker
    105         servers = sb.get_all_servers()
     105        servers = sb.get_connected_servers()
    106106        sh = self._secret_holder
    107107
    108108        c = Checker(verifycap=verifycap, servers=servers,
    class CiphertextFileNode: 
    160160    def check(self, monitor, verify=False, add_lease=False):
    161161        verifycap = self._verifycap
    162162        sb = self._storage_broker
    163         servers = sb.get_all_servers()
     163        servers = sb.get_connected_servers()
    164164        sh = self._secret_holder
    165165
    166166        v = Checker(verifycap=verifycap, servers=servers,
  • src/allmydata/immutable/offloaded.py

    diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py
    index dea94c5..26be786 100644
    a b class CHKCheckerAndUEBFetcher: 
    5353
    5454    def _get_all_shareholders(self, storage_index):
    5555        dl = []
    56         for (peerid, ss) in self._peer_getter(storage_index):
    57             d = ss.callRemote("get_buckets", storage_index)
     56        for s in self._peer_getter(storage_index):
     57            d = s.get_rref().callRemote("get_buckets", storage_index)
    5858            d.addCallbacks(self._got_response, self._got_error,
    59                            callbackArgs=(peerid,))
     59                           callbackArgs=(s.get_serverid(),))
    6060            dl.append(d)
    6161        return defer.DeferredList(dl)
    6262
    class Helper(Referenceable): 
    620620        lp2 = self.log("doing a quick check+UEBfetch",
    621621                       parent=lp, level=log.NOISY)
    622622        sb = self._storage_broker
    623         c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
     623        c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
    624624        d = c.check()
    625625        def _checked(res):
    626626            if res:
  • src/allmydata/immutable/upload.py

    diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py
    index 5dd257b..a53f12a 100644
    a b class Tahoe2PeerSelector(log.PrefixingLogMixin): 
    224224                                             num_share_hashes, EXTENSION_SIZE,
    225225                                             None)
    226226        allocated_size = wbp.get_allocated_size()
    227         all_peers = storage_broker.get_servers_for_index(storage_index)
     227        all_peers = [(s.get_serverid(), s.get_rref())
     228                     for s in storage_broker.get_servers_for_psi(storage_index)]
    228229        if not all_peers:
    229230            raise NoServersError("client gave us zero peers")
    230231
  • src/allmydata/interfaces.py

    diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
    index 48094a9..85c3e07 100644
    a b class IStorageBucketReader(Interface): 
    352352        """
    353353
    354354class IStorageBroker(Interface):
    355     def get_servers_for_index(peer_selection_index):
     355    def get_servers_for_psi(peer_selection_index):
    356356        """
    357         @return: list of (peerid, versioned-rref) tuples
     357        @return: list of IServer instances
    358358        """
    359     def get_all_servers():
     359    def get_connected_servers():
    360360        """
    361         @return: frozenset of (peerid, versioned-rref) tuples
     361        @return: frozenset of connected IServer instances
     362        """
     363    def get_known_servers():
     364        """
     365        @return: frozenset of IServer instances
    362366        """
    363367    def get_all_serverids():
    364368        """
  • src/allmydata/mutable/publish.py

    diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py
    index 2d63c87..580682b 100644
    a b class Publish: 
    179179        self._encprivkey = self._node.get_encprivkey()
    180180
    181181        sb = self._storage_broker
    182         full_peerlist = sb.get_servers_for_index(self._storage_index)
     182        full_peerlist = [(s.get_serverid(), s.get_rref())
     183                         for s in sb.get_servers_for_psi(self._storage_index)]
    183184        self.full_peerlist = full_peerlist # for use later, immutable
    184185        self.bad_peers = set() # peerids who have errbacked/refused requests
    185186
  • src/allmydata/mutable/servermap.py

    diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py
    index 999691f..c69e410 100644
    a b class ServermapUpdater: 
    424424        self._queries_completed = 0
    425425
    426426        sb = self._storage_broker
    427         full_peerlist = sb.get_servers_for_index(self._storage_index)
     427        full_peerlist = [(s.get_serverid(), s.get_rref())
     428                         for s in sb.get_servers_for_psi(self._storage_index)]
    428429        self.full_peerlist = full_peerlist # for use later, immutable
    429430        self.extra_peers = full_peerlist[:] # peers are removed as we use them
    430431        self._good_peers = set() # peers who had some shares
  • src/allmydata/storage_client.py

    diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py
    index 7945eac..e1b3f55 100644
    a b from zope.interface import implements, Interface 
    3434from foolscap.api import eventually
    3535from allmydata.interfaces import IStorageBroker
    3636from allmydata.util import idlib, log
    37 from allmydata.util.assertutil import _assert, precondition
     37from allmydata.util.assertutil import precondition
    3838from allmydata.util.rrefutil import add_version_to_remote_reference
    3939from allmydata.util.hashutil import sha1
    4040
    class StorageFarmBroker: 
    6666        self.tub = tub
    6767        assert permute_peers # False not implemented yet
    6868        self.permute_peers = permute_peers
    69         # self.descriptors maps serverid -> IServerDescriptor, and keeps
    70         # track of all the storage servers that we've heard about. Each
    71         # descriptor manages its own Reconnector, and will give us a
    72         # RemoteReference when we ask them for it.
    73         self.descriptors = {}
     69        # self.servers maps serverid -> IServer, and keeps track of all the
     70        # storage servers that we've heard about. Each descriptor manages its
     71        # own Reconnector, and will give us a RemoteReference when we ask
     72        # them for it.
     73        self.servers = {}
    7474        # self.test_servers are statically configured from unit tests
    7575        self.test_servers = {} # serverid -> rref
    7676        self.introducer_client = None
    class StorageFarmBroker: 
    7979    def test_add_server(self, serverid, rref):
    8080        self.test_servers[serverid] = rref
    8181    def test_add_descriptor(self, serverid, dsc):
    82         self.descriptors[serverid] = dsc
     82        self.servers[serverid] = dsc
    8383
    8484    def use_introducer(self, introducer_client):
    8585        self.introducer_client = ic = introducer_client
    class StorageFarmBroker: 
    8989        precondition(isinstance(serverid, str), serverid)
    9090        precondition(len(serverid) == 20, serverid)
    9191        assert ann_d["service-name"] == "storage"
    92         old = self.descriptors.get(serverid)
     92        old = self.servers.get(serverid)
    9393        if old:
    9494            if old.get_announcement() == ann_d:
    9595                return # duplicate
    9696            # replacement
    97             del self.descriptors[serverid]
     97            del self.servers[serverid]
    9898            old.stop_connecting()
    9999            # now we forget about them and start using the new one
    100         dsc = NativeStorageClientDescriptor(serverid, ann_d)
    101         self.descriptors[serverid] = dsc
     100        dsc = NativeStorageServer(serverid, ann_d)
     101        self.servers[serverid] = dsc
    102102        dsc.start_connecting(self.tub, self._trigger_connections)
    103103        # the descriptor will manage their own Reconnector, and each time we
    104104        # need servers, we'll ask them if they're connected or not.
    class StorageFarmBroker: 
    111111        # connections to only a subset of the servers, which would increase
    112112        # the chances that we'll put shares in weird places (and not update
    113113        # existing shares of mutable files). See #374 for more details.
    114         for dsc in self.descriptors.values():
     114        for dsc in self.servers.values():
    115115            dsc.try_to_connect()
    116116
    117 
    118 
    119     def get_servers_for_index(self, peer_selection_index):
    120         # first cut: return a list of (peerid, versioned-rref) tuples
     117    def get_servers_for_psi(self, peer_selection_index):
     118        # return a list of server objects (IServers)
    121119        assert self.permute_peers == True
    122         servers = self.get_all_servers()
    123         key = peer_selection_index
    124         return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
    125 
    126     def get_all_servers(self):
    127         # return a frozenset of (peerid, versioned-rref) tuples
    128         servers = {}
    129         for serverid,rref in self.test_servers.items():
    130             servers[serverid] = rref
    131         for serverid,dsc in self.descriptors.items():
    132             rref = dsc.get_rref()
    133             if rref:
    134                 servers[serverid] = rref
    135         result = frozenset(servers.items())
    136         _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
    137         return result
     120        def _permuted(server):
     121            seed = server.get_permutation_seed()
     122            return sha1(peer_selection_index + seed).digest()
     123        return sorted(self.get_connected_servers(), key=_permuted)
    138124
    139125    def get_all_serverids(self):
    140126        serverids = set()
    141127        serverids.update(self.test_servers.keys())
    142         serverids.update(self.descriptors.keys())
     128        serverids.update(self.servers.keys())
    143129        return frozenset(serverids)
    144130
    145     def get_all_descriptors(self):
    146         return sorted(self.descriptors.values(),
    147                       key=lambda dsc: dsc.get_serverid())
     131    def get_connected_servers(self):
     132        return frozenset([s for s in self.get_known_servers()
     133                          if s.get_rref()])
     134
     135    def get_known_servers(self):
     136        servers = []
     137        for serverid,rref in self.test_servers.items():
     138            s = NativeStorageServer(serverid, {})
     139            s.rref = rref
     140            servers.append(s)
     141        servers.extend(self.servers.values())
     142        return sorted(servers, key=lambda s: s.get_serverid())
    148143
    149144    def get_nickname_for_serverid(self, serverid):
    150         if serverid in self.descriptors:
    151             return self.descriptors[serverid].get_nickname()
     145        if serverid in self.servers:
     146            return self.servers[serverid].get_nickname()
    152147        return None
    153148
    154149
    155 class IServerDescriptor(Interface):
     150class IServer(Interface):
     151    """I live in the client, and represent a single server."""
    156152    def start_connecting(tub, trigger_cb):
    157153        pass
    158154    def get_nickname():
    class IServerDescriptor(Interface): 
    160156    def get_rref():
    161157        pass
    162158
    163 class NativeStorageClientDescriptor:
     159class NativeStorageServer:
    164160    """I hold information about a storage server that we want to connect to.
    165161    If we are connected, I hold the RemoteReference, their host address, and
    166162    the their version information. I remember information about when we were
    class NativeStorageClientDescriptor: 
    176172    @ivar rref: the RemoteReference, if connected, otherwise None
    177173    @ivar remote_host: the IAddress, if connected, otherwise None
    178174    """
    179     implements(IServerDescriptor)
     175    implements(IServer)
    180176
    181177    VERSION_DEFAULTS = {
    182178        "http://allmydata.org/tahoe/protocols/storage/v1" :
    class NativeStorageClientDescriptor: 
    203199
    204200    def get_serverid(self):
    205201        return self.serverid
     202    def get_permutation_seed(self):
     203        return self.serverid
    206204
    207205    def get_nickname(self):
    208206        return self.announcement["nickname"].decode("utf-8")
  • src/allmydata/test/common.py

    diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py
    index e8117bf..dfcaa10 100644
    a b class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): 
    572572            if not c.connected_to_introducer():
    573573                return False
    574574            sb = c.get_storage_broker()
    575             if len(sb.get_all_servers()) != self.numclients:
     575            if len(sb.get_connected_servers()) != self.numclients:
    576576                return False
    577577        return True
    578578
  • src/allmydata/test/no_network.py

    diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py
    index 554a75f..f07c36b 100644
    a b def wrap_storage_server(original): 
    117117    wrapper.version = original.remote_get_version()
    118118    return wrapper
    119119
     120class NoNetworkServer:
     121    def __init__(self, serverid, rref):
     122        self.serverid = serverid
     123        self.rref = rref
     124    def get_serverid(self):
     125        return self.serverid
     126    def get_permutation_seed(self):
     127        return self.serverid
     128    def get_rref(self):
     129        return self.rref
     130
    120131class NoNetworkStorageBroker:
    121132    implements(IStorageBroker)
    122     def get_servers_for_index(self, key):
    123         return sorted(self.client._servers,
    124                       key=lambda x: sha1(key+x[0]).digest())
    125     def get_all_servers(self):
    126         return frozenset(self.client._servers)
     133    def get_servers_for_psi(self, peer_selection_index):
     134        def _permuted(server):
     135            seed = server.get_permutation_seed()
     136            return sha1(peer_selection_index + seed).digest()
     137        return sorted(self.get_connected_servers(), key=_permuted)
     138    def get_connected_servers(self):
     139        return self.client._servers
    127140    def get_nickname_for_serverid(self, serverid):
    128141        return None
    129142
    class NoNetworkGrid(service.MultiService): 
    181194        self.basedir = basedir
    182195        fileutil.make_dirs(basedir)
    183196
    184         self.servers_by_number = {}
    185         self.servers_by_id = {}
     197        self.servers_by_number = {} # maps to StorageServer instance
     198        self.wrappers_by_id = {} # maps to wrapped StorageServer instance
     199        self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
     200                                # StorageServer
    186201        self.clients = []
    187202
    188203        for i in range(num_servers):
    class NoNetworkGrid(service.MultiService): 
    234249        ss.setServiceParent(middleman)
    235250        serverid = ss.my_nodeid
    236251        self.servers_by_number[i] = ss
    237         self.servers_by_id[serverid] = wrap_storage_server(ss)
     252        wrapper = wrap_storage_server(ss)
     253        self.wrappers_by_id[serverid] = wrapper
     254        self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
    238255        self.rebuild_serverlist()
    239256
     257    def get_all_serverids(self):
     258        return self.proxies_by_id.keys()
     259
    240260    def rebuild_serverlist(self):
    241         self.all_servers = frozenset(self.servers_by_id.items())
     261        self.all_servers = frozenset(self.proxies_by_id.values())
    242262        for c in self.clients:
    243263            c._servers = self.all_servers
    244264
    class NoNetworkGrid(service.MultiService): 
    249269            if ss.my_nodeid == serverid:
    250270                del self.servers_by_number[i]
    251271                break
    252         del self.servers_by_id[serverid]
     272        del self.wrappers_by_id[serverid]
     273        del self.proxies_by_id[serverid]
    253274        self.rebuild_serverlist()
    254275
    255276    def break_server(self, serverid):
    256277        # mark the given server as broken, so it will throw exceptions when
    257278        # asked to hold a share or serve a share
    258         self.servers_by_id[serverid].broken = True
     279        self.wrappers_by_id[serverid].broken = True
    259280
    260281    def hang_server(self, serverid):
    261282        # hang the given server
    262         ss = self.servers_by_id[serverid]
     283        ss = self.wrappers_by_id[serverid]
    263284        assert ss.hung_until is None
    264285        ss.hung_until = defer.Deferred()
    265286
    266287    def unhang_server(self, serverid):
    267288        # unhang the given server
    268         ss = self.servers_by_id[serverid]
     289        ss = self.wrappers_by_id[serverid]
    269290        assert ss.hung_until is not None
    270291        ss.hung_until.callback(None)
    271292        ss.hung_until = None
  • src/allmydata/test/test_checker.py

    diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py
    index 7548d7c..d1cf86f 100644
    a b import simplejson 
    33from twisted.trial import unittest
    44from allmydata import check_results, uri
    55from allmydata.web import check_results as web_check_results
    6 from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor
     6from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
    77from allmydata.monitor import Monitor
    88from allmydata.test.no_network import GridTestMixin
    99from allmydata.immutable.upload import Data
    class WebResultsRendering(unittest.TestCase, WebRenderingMixin): 
    2828                      "my-version": "ver",
    2929                      "oldest-supported": "oldest",
    3030                      }
    31             dsc = NativeStorageClientDescriptor(peerid, ann_d)
     31            dsc = NativeStorageServer(peerid, ann_d)
    3232            sb.test_add_descriptor(peerid, dsc)
    3333        c = FakeClient()
    3434        c.storage_broker = sb
  • src/allmydata/test/test_client.py

    diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py
    index 7c6a456..abf63aa 100644
    a b class Basic(testutil.ReallyEqualMixin, unittest.TestCase): 
    134134        self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
    135135
    136136    def _permute(self, sb, key):
    137         return [ peerid
    138                  for (peerid,rref) in sb.get_servers_for_index(key) ]
     137        return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
    139138
    140139    def test_permute(self):
    141140        sb = StorageFarmBroker(None, True)
    142141        for k in ["%d" % i for i in range(5)]:
    143             sb.test_add_server(k, None)
     142            sb.test_add_server(k, "rref")
    144143
    145144        self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
    146145        self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
  • src/allmydata/test/test_deepcheck.py

    diff --git a/src/allmydata/test/test_deepcheck.py b/src/allmydata/test/test_deepcheck.py
    index 5168b37..4bcfe28 100644
    a b class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): 
    287287        self.failUnlessEqual(d["list-corrupt-shares"], [], where)
    288288        if not incomplete:
    289289            self.failUnlessEqual(sorted(d["servers-responding"]),
    290                                  sorted(self.g.servers_by_id.keys()),
     290                                 sorted(self.g.get_all_serverids()),
    291291                                 where)
    292292            self.failUnless("sharemap" in d, str((where, d)))
    293293            all_serverids = set()
    294294            for (shareid, serverids) in d["sharemap"].items():
    295295                all_serverids.update(serverids)
    296296            self.failUnlessEqual(sorted(all_serverids),
    297                                  sorted(self.g.servers_by_id.keys()),
     297                                 sorted(self.g.get_all_serverids()),
    298298                                 where)
    299299
    300300        self.failUnlessEqual(d["count-wrong-shares"], 0, where)
    class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): 
    545545        if not incomplete:
    546546            self.failUnlessEqual(sorted(r["servers-responding"]),
    547547                                 sorted([idlib.nodeid_b2a(sid)
    548                                          for sid in self.g.servers_by_id]),
     548                                         for sid in self.g.get_all_serverids()]),
    549549                                 where)
    550550            self.failUnless("sharemap" in r, where)
    551551            all_serverids = set()
    class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): 
    553553                all_serverids.update(serverids_s)
    554554            self.failUnlessEqual(sorted(all_serverids),
    555555                                 sorted([idlib.nodeid_b2a(sid)
    556                                          for sid in self.g.servers_by_id]),
     556                                         for sid in self.g.get_all_serverids()]),
    557557                                 where)
    558558        self.failUnlessEqual(r["count-wrong-shares"], 0, where)
    559559        self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
  • src/allmydata/test/test_download.py

    diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py
    index 3a69220..ab2e98b 100644
    a b class DownloadTest(_Base, unittest.TestCase): 
    596596        # tweak the client's copies of server-version data, so it believes
    597597        # that they're old and can't handle reads that overrun the length of
    598598        # the share. This exercises a different code path.
    599         for (peerid, rref) in self.c0.storage_broker.get_all_servers():
     599        for s in self.c0.storage_broker.get_connected_servers():
     600            rref = s.get_rref()
    600601            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
    601602            v1["tolerates-immutable-read-overrun"] = False
    602603
    class DownloadV2(_Base, unittest.TestCase): 
    11671168        # tweak the client's copies of server-version data, so it believes
    11681169        # that they're old and can't handle reads that overrun the length of
    11691170        # the share. This exercises a different code path.
    1170         for (peerid, rref) in self.c0.storage_broker.get_all_servers():
     1171        for s in self.c0.storage_broker.get_connected_servers():
     1172            rref = s.get_rref()
    11711173            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
    11721174            v1["tolerates-immutable-read-overrun"] = False
    11731175
    class DownloadV2(_Base, unittest.TestCase): 
    11861188        self.set_up_grid()
    11871189        self.c0 = self.g.clients[0]
    11881190
    1189         for (peerid, rref) in self.c0.storage_broker.get_all_servers():
     1191        for s in self.c0.storage_broker.get_connected_servers():
     1192            rref = s.get_rref()
    11901193            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
    11911194            v1["tolerates-immutable-read-overrun"] = False
    11921195
  • src/allmydata/test/test_hung_server.py

    diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py
    index e63cba3..abed967 100644
    a b class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, 
    101101
    102102        self.c0 = self.g.clients[0]
    103103        nm = self.c0.nodemaker
    104         self.servers = sorted([(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()])
     104        self.servers = sorted([(s.get_serverid(), s.get_rref())
     105                               for s in nm.storage_broker.get_connected_servers()])
    105106        self.servers = self.servers[5:] + self.servers[:5]
    106107
    107108        if mutable:
  • src/allmydata/test/test_immutable.py

    diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py
    index edac3e6..1ca4383 100644
    a b class TestShareFinder(unittest.TestCase): 
    9595                    self.s.hungry()
    9696                eventually(_give_buckets_and_hunger_again)
    9797                return d
     98        class MockIServer(object):
     99            def __init__(self, serverid, rref):
     100                self.serverid = serverid
     101                self.rref = rref
     102            def get_serverid(self):
     103                return self.serverid
     104            def get_rref(self):
     105                return self.rref
    98106
    99107        mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
    100108        mockserver2 = MockServer({})
    101109        mockserver3 = MockServer({3: mock.Mock()})
    102110        mockstoragebroker = mock.Mock()
    103         mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2), ('ms3', mockserver3), ]
     111        servers = [ MockIServer("ms1", mockserver1),
     112                    MockIServer("ms2", mockserver2),
     113                    MockIServer("ms3", mockserver3), ]
     114        mockstoragebroker.get_servers_for_psi.return_value = servers
    104115        mockdownloadstatus = mock.Mock()
    105116        mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
    106117
  • src/allmydata/test/test_mutable.py

    diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py
    index e4e6eb7..0f49dce 100644
    a b class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): 
    19101910        d.addCallback(_got_key)
    19111911        def _break_peer0(res):
    19121912            si = self._storage_index
    1913             peerlist = nm.storage_broker.get_servers_for_index(si)
    1914             peerid0, connection0 = peerlist[0]
    1915             peerid1, connection1 = peerlist[1]
    1916             connection0.broken = True
    1917             self.connection1 = connection1
     1913            servers = nm.storage_broker.get_servers_for_psi(si)
     1914            self.g.break_server(servers[0].get_serverid())
     1915            self.server1 = servers[1]
    19181916        d.addCallback(_break_peer0)
    19191917        # now "create" the file, using the pre-established key, and let the
    19201918        # initial publish finally happen
    class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): 
    19251923            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
    19261924            # now break the second peer
    19271925            def _break_peer1(res):
    1928                 self.connection1.broken = True
     1926                self.g.break_server(self.server1.get_serverid())
    19291927            d.addCallback(_break_peer1)
    19301928            d.addCallback(lambda res: n.overwrite("contents 2"))
    19311929            # that ought to work too
    class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): 
    19561954        nm = self.g.clients[0].nodemaker
    19571955        sb = nm.storage_broker
    19581956
    1959         peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
     1957        peerids = [s.get_serverid() for s in sb.get_connected_servers()]
    19601958        self.g.break_server(peerids[0])
    19611959
    19621960        d = nm.create_mutable_file("contents 1")
    class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): 
    19801978        self.basedir = "mutable/Problems/test_publish_all_servers_bad"
    19811979        self.set_up_grid()
    19821980        nm = self.g.clients[0].nodemaker
    1983         for (serverid,ss) in nm.storage_broker.get_all_servers():
    1984             ss.broken = True
     1981        for s in nm.storage_broker.get_connected_servers():
     1982            s.get_rref().broken = True
    19851983
    19861984        d = self.shouldFail(NotEnoughServersError,
    19871985                            "test_publish_all_servers_bad",
    class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): 
    20332031            #  1. notice which server gets a read() call first
    20342032            #  2. tell that server to start throwing errors
    20352033            killer = FirstServerGetsKilled()
    2036             for (serverid,ss) in nm.storage_broker.get_all_servers():
    2037                 ss.post_call_notifier = killer.notify
     2034            for s in nm.storage_broker.get_connected_servers():
     2035                s.get_rref().post_call_notifier = killer.notify
    20382036        d.addCallback(_created)
    20392037
    20402038        # now we update a servermap from a new node (which doesn't have the
    class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): 
    20592057            self.uri = n.get_uri()
    20602058            self.n2 = nm.create_from_cap(self.uri)
    20612059            deleter = FirstServerGetsDeleted()
    2062             for (serverid,ss) in nm.storage_broker.get_all_servers():
    2063                 ss.post_call_notifier = deleter.notify
     2060            for s in nm.storage_broker.get_connected_servers():
     2061                s.get_rref().post_call_notifier = deleter.notify
    20642062        d.addCallback(_created)
    20652063        d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
    20662064        return d
  • src/allmydata/test/test_system.py

    diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py
    index bf6af09..e37f663 100644
    a b class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): 
    6565                all_peerids = c.get_storage_broker().get_all_serverids()
    6666                self.failUnlessEqual(len(all_peerids), self.numclients+1)
    6767                sb = c.storage_broker
    68                 permuted_peers = sb.get_servers_for_index("a")
     68                permuted_peers = sb.get_servers_for_psi("a")
    6969                self.failUnlessEqual(len(permuted_peers), self.numclients+1)
    7070
    7171        d.addCallback(_check)
    class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): 
    101101                all_peerids = c.get_storage_broker().get_all_serverids()
    102102                self.failUnlessEqual(len(all_peerids), self.numclients)
    103103                sb = c.storage_broker
    104                 permuted_peers = sb.get_servers_for_index("a")
     104                permuted_peers = sb.get_servers_for_psi("a")
    105105                self.failUnlessEqual(len(permuted_peers), self.numclients)
    106106        d.addCallback(_check_connections)
    107107
  • src/allmydata/test/test_upload.py

    diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py
    index 022185e..1125932 100644
    a b class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, 
    17301730        d.addCallback(lambda ign:
    17311731            self._add_server(server_number=2))
    17321732        def _break_server_2(ign):
    1733             server = self.g.servers_by_number[2].my_nodeid
    1734             # We have to break the server in servers_by_id,
    1735             # because the one in servers_by_number isn't wrapped,
    1736             # and doesn't look at its broken attribute when answering
    1737             # queries.
    1738             self.g.servers_by_id[server].broken = True
     1733            serverid = self.g.servers_by_number[2].my_nodeid
     1734            self.g.break_server(serverid)
    17391735        d.addCallback(_break_server_2)
    17401736        d.addCallback(lambda ign:
    17411737            self._add_server(server_number=3, readonly=True))
  • src/allmydata/web/check_results.py

    diff --git a/src/allmydata/web/check_results.py b/src/allmydata/web/check_results.py
    index 96ce17b..44d0506 100644
    a b class ResultsBase: 
    139139
    140140        # this table is sorted by permuted order
    141141        sb = c.get_storage_broker()
    142         permuted_peer_ids = [peerid
    143                              for (peerid, rref)
    144                              in sb.get_servers_for_index(cr.get_storage_index())]
     142        permuted_peer_ids = [s.get_serverid()
     143                             for s
     144                             in sb.get_servers_for_psi(cr.get_storage_index())]
    145145
    146146        num_shares_left = sum([len(shares) for shares in servers.values()])
    147147        servermap = []
  • src/allmydata/web/root.py

    diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py
    index 3af15d9..2f1f6d4 100644
    a b class Root(rend.Page): 
    247247
    248248    def data_connected_storage_servers(self, ctx, data):
    249249        sb = self.client.get_storage_broker()
    250         return len(sb.get_all_servers())
     250        return len(sb.get_connected_servers())
    251251
    252252    def data_services(self, ctx, data):
    253253        sb = self.client.get_storage_broker()
    254         return sb.get_all_descriptors()
     254        return sb.get_known_servers()
    255255
    256     def render_service_row(self, ctx, descriptor):
    257         nodeid = descriptor.get_serverid()
     256    def render_service_row(self, ctx, server):
     257        nodeid = server.get_serverid()
    258258
    259259        ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
    260         ctx.fillSlots("nickname", descriptor.get_nickname())
    261         rhost = descriptor.get_remote_host()
     260        ctx.fillSlots("nickname", server.get_nickname())
     261        rhost = server.get_remote_host()
    262262        if rhost:
    263263            if nodeid == self.client.nodeid:
    264264                rhost_s = "(loopback)"
    class Root(rend.Page): 
    267267            else:
    268268                rhost_s = str(rhost)
    269269            connected = "Yes: to " + rhost_s
    270             since = descriptor.get_last_connect_time()
     270            since = server.get_last_connect_time()
    271271        else:
    272272            connected = "No"
    273             since = descriptor.get_last_loss_time()
    274         announced = descriptor.get_announcement_time()
    275         announcement = descriptor.get_announcement()
     273            since = server.get_last_loss_time()
     274        announced = server.get_announcement_time()
     275        announcement = server.get_announcement()
    276276        version = announcement["my-version"]
    277277        service_name = announcement["service-name"]
    278278