-
diff --git a/src/allmydata/client.py b/src/allmydata/client.py
index fa515d4..fb7e0c8 100644
a
|
b
|
class Client(node.Node, pollmixin.PollMixin): |
468 | 468 | temporary test network and need to know when it is safe to proceed |
469 | 469 | with an upload or download.""" |
470 | 470 | def _check(): |
471 | | return len(self.storage_broker.get_all_servers()) >= num_clients |
| 471 | return len(self.storage_broker.get_connected_servers()) >= num_clients |
472 | 472 | d = self.poll(_check, 0.5) |
473 | 473 | d.addCallback(lambda res: None) |
474 | 474 | return d |
-
diff --git a/src/allmydata/control.py b/src/allmydata/control.py
index cb0c84f..045a34e 100644
a
|
b
|
class ControlServer(Referenceable, service.Service): |
91 | 91 | # 300ms. |
92 | 92 | results = {} |
93 | 93 | sb = self.parent.get_storage_broker() |
94 | | everyone = sb.get_all_servers() |
| 94 | everyone = sb.get_connected_servers() |
95 | 95 | num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3))) |
96 | 96 | everyone = list(everyone) * num_pings |
97 | 97 | d = self._do_one_ping(None, everyone, results) |
… |
… |
class ControlServer(Referenceable, service.Service): |
99 | 99 | def _do_one_ping(self, res, everyone_left, results): |
100 | 100 | if not everyone_left: |
101 | 101 | return results |
102 | | peerid, connection = everyone_left.pop(0) |
| 102 | server = everyone_left.pop(0) |
| 103 | peerid = server.get_serverid() |
| 104 | connection = server.get_rref() |
103 | 105 | start = time.time() |
104 | 106 | d = connection.callRemote("get_buckets", "\x00"*16) |
105 | 107 | def _done(ignored): |
-
diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py
index cd5c556..f350029 100644
a
|
b
|
class Checker(log.PrefixingLogMixin): |
463 | 463 | def __init__(self, verifycap, servers, verify, add_lease, secret_holder, |
464 | 464 | monitor): |
465 | 465 | assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap)) |
466 | | assert precondition(isinstance(servers, (set, frozenset)), servers) |
467 | | for (serverid, serverrref) in servers: |
468 | | assert precondition(isinstance(serverid, str)) |
469 | 466 | |
470 | 467 | prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60) |
471 | 468 | log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix) |
… |
… |
class Checker(log.PrefixingLogMixin): |
489 | 486 | def _get_cancel_secret(self, peerid): |
490 | 487 | return bucket_cancel_secret_hash(self.file_cancel_secret, peerid) |
491 | 488 | |
492 | | def _get_buckets(self, server, storageindex, serverid): |
| 489 | def _get_buckets(self, s, storageindex): |
493 | 490 | """Return a deferred that eventually fires with ({sharenum: bucket}, |
494 | 491 | serverid, success). In case the server is disconnected or returns a |
495 | 492 | Failure then it fires with ({}, serverid, False) (A server |
… |
… |
class Checker(log.PrefixingLogMixin): |
498 | 495 | that we want to track and report whether or not each server |
499 | 496 | responded.)""" |
500 | 497 | |
| 498 | rref = s.get_rref() |
| 499 | serverid = s.get_serverid() |
501 | 500 | if self._add_lease: |
502 | 501 | renew_secret = self._get_renewal_secret(serverid) |
503 | 502 | cancel_secret = self._get_cancel_secret(serverid) |
504 | | d2 = server.callRemote("add_lease", storageindex, |
505 | | renew_secret, cancel_secret) |
| 503 | d2 = rref.callRemote("add_lease", storageindex, |
| 504 | renew_secret, cancel_secret) |
506 | 505 | d2.addErrback(self._add_lease_failed, serverid, storageindex) |
507 | 506 | |
508 | | d = server.callRemote("get_buckets", storageindex) |
| 507 | d = rref.callRemote("get_buckets", storageindex) |
509 | 508 | def _wrap_results(res): |
510 | 509 | return (res, serverid, True) |
511 | 510 | |
… |
… |
class Checker(log.PrefixingLogMixin): |
656 | 655 | |
657 | 656 | return d |
658 | 657 | |
659 | | def _verify_server_shares(self, serverid, ss): |
| 658 | def _verify_server_shares(self, s): |
660 | 659 | """ Return a deferred which eventually fires with a tuple of |
661 | 660 | (set(sharenum), serverid, set(corruptsharenum), |
662 | 661 | set(incompatiblesharenum), success) showing all the shares verified |
… |
… |
class Checker(log.PrefixingLogMixin): |
679 | 678 | then disconnected and ceased responding, or returned a failure, it is |
680 | 679 | still marked with the True flag for 'success'. |
681 | 680 | """ |
682 | | d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid) |
| 681 | d = self._get_buckets(s, self._verifycap.get_storage_index()) |
683 | 682 | |
684 | 683 | def _got_buckets(result): |
685 | 684 | bucketdict, serverid, success = result |
… |
… |
class Checker(log.PrefixingLogMixin): |
710 | 709 | |
711 | 710 | def _err(f): |
712 | 711 | f.trap(RemoteException, DeadReferenceError) |
713 | | return (set(), serverid, set(), set(), False) |
| 712 | return (set(), s.get_serverid(), set(), set(), False) |
714 | 713 | |
715 | 714 | d.addCallbacks(_got_buckets, _err) |
716 | 715 | return d |
717 | 716 | |
718 | | def _check_server_shares(self, serverid, ss): |
| 717 | def _check_server_shares(self, s): |
719 | 718 | """Return a deferred which eventually fires with a tuple of |
720 | 719 | (set(sharenum), serverid, set(), set(), responded) showing all the |
721 | 720 | shares claimed to be served by this server. In case the server is |
… |
… |
class Checker(log.PrefixingLogMixin): |
726 | 725 | def _curry_empty_corrupted(res): |
727 | 726 | buckets, serverid, responded = res |
728 | 727 | return (set(buckets), serverid, set(), set(), responded) |
729 | | d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid) |
| 728 | d = self._get_buckets(s, self._verifycap.get_storage_index()) |
730 | 729 | d.addCallback(_curry_empty_corrupted) |
731 | 730 | return d |
732 | 731 | |
… |
… |
class Checker(log.PrefixingLogMixin): |
794 | 793 | def start(self): |
795 | 794 | ds = [] |
796 | 795 | if self._verify: |
797 | | for (serverid, ss) in self._servers: |
798 | | ds.append(self._verify_server_shares(serverid, ss)) |
| 796 | for s in self._servers: |
| 797 | ds.append(self._verify_server_shares(s)) |
799 | 798 | else: |
800 | | for (serverid, ss) in self._servers: |
801 | | ds.append(self._check_server_shares(serverid, ss)) |
| 799 | for s in self._servers: |
| 800 | ds.append(self._check_server_shares(s)) |
802 | 801 | |
803 | 802 | return deferredutil.gatherResults(ds).addCallback(self._format_results) |
-
diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py
index 4816ccd..f1142e7 100644
a
|
b
|
class ShareFinder: |
62 | 62 | # test_dirnode, which creates us with storage_broker=None |
63 | 63 | if not self._started: |
64 | 64 | si = self.verifycap.storage_index |
65 | | s = self._storage_broker.get_servers_for_index(si) |
66 | | self._servers = iter(s) |
| 65 | servers = [(s.get_serverid(), s.get_rref()) |
| 66 | for s in self._storage_broker.get_servers_for_psi(si)] |
| 67 | self._servers = iter(servers) |
67 | 68 | self._started = True |
68 | 69 | |
69 | 70 | def log(self, *args, **kwargs): |
-
diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py
index ed3785b..8fc4772 100644
a
|
b
|
class CiphertextFileNode: |
102 | 102 | verifycap = self._verifycap |
103 | 103 | storage_index = verifycap.storage_index |
104 | 104 | sb = self._storage_broker |
105 | | servers = sb.get_all_servers() |
| 105 | servers = sb.get_connected_servers() |
106 | 106 | sh = self._secret_holder |
107 | 107 | |
108 | 108 | c = Checker(verifycap=verifycap, servers=servers, |
… |
… |
class CiphertextFileNode: |
160 | 160 | def check(self, monitor, verify=False, add_lease=False): |
161 | 161 | verifycap = self._verifycap |
162 | 162 | sb = self._storage_broker |
163 | | servers = sb.get_all_servers() |
| 163 | servers = sb.get_connected_servers() |
164 | 164 | sh = self._secret_holder |
165 | 165 | |
166 | 166 | v = Checker(verifycap=verifycap, servers=servers, |
-
diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py
index dea94c5..26be786 100644
a
|
b
|
class CHKCheckerAndUEBFetcher: |
53 | 53 | |
54 | 54 | def _get_all_shareholders(self, storage_index): |
55 | 55 | dl = [] |
56 | | for (peerid, ss) in self._peer_getter(storage_index): |
57 | | d = ss.callRemote("get_buckets", storage_index) |
| 56 | for s in self._peer_getter(storage_index): |
| 57 | d = s.get_rref().callRemote("get_buckets", storage_index) |
58 | 58 | d.addCallbacks(self._got_response, self._got_error, |
59 | | callbackArgs=(peerid,)) |
| 59 | callbackArgs=(s.get_serverid(),)) |
60 | 60 | dl.append(d) |
61 | 61 | return defer.DeferredList(dl) |
62 | 62 | |
… |
… |
class Helper(Referenceable): |
620 | 620 | lp2 = self.log("doing a quick check+UEBfetch", |
621 | 621 | parent=lp, level=log.NOISY) |
622 | 622 | sb = self._storage_broker |
623 | | c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2) |
| 623 | c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2) |
624 | 624 | d = c.check() |
625 | 625 | def _checked(res): |
626 | 626 | if res: |
-
diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py
index 5dd257b..a53f12a 100644
a
|
b
|
class Tahoe2PeerSelector(log.PrefixingLogMixin): |
224 | 224 | num_share_hashes, EXTENSION_SIZE, |
225 | 225 | None) |
226 | 226 | allocated_size = wbp.get_allocated_size() |
227 | | all_peers = storage_broker.get_servers_for_index(storage_index) |
| 227 | all_peers = [(s.get_serverid(), s.get_rref()) |
| 228 | for s in storage_broker.get_servers_for_psi(storage_index)] |
228 | 229 | if not all_peers: |
229 | 230 | raise NoServersError("client gave us zero peers") |
230 | 231 | |
-
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index 48094a9..85c3e07 100644
a
|
b
|
class IStorageBucketReader(Interface): |
352 | 352 | """ |
353 | 353 | |
354 | 354 | class IStorageBroker(Interface): |
355 | | def get_servers_for_index(peer_selection_index): |
| 355 | def get_servers_for_psi(peer_selection_index): |
356 | 356 | """ |
357 | | @return: list of (peerid, versioned-rref) tuples |
| 357 | @return: list of IServer instances |
358 | 358 | """ |
359 | | def get_all_servers(): |
| 359 | def get_connected_servers(): |
360 | 360 | """ |
361 | | @return: frozenset of (peerid, versioned-rref) tuples |
| 361 | @return: frozenset of connected IServer instances |
| 362 | """ |
| 363 | def get_known_servers(): |
| 364 | """ |
| 365 | @return: frozenset of IServer instances |
362 | 366 | """ |
363 | 367 | def get_all_serverids(): |
364 | 368 | """ |
-
diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py
index 2d63c87..580682b 100644
a
|
b
|
class Publish: |
179 | 179 | self._encprivkey = self._node.get_encprivkey() |
180 | 180 | |
181 | 181 | sb = self._storage_broker |
182 | | full_peerlist = sb.get_servers_for_index(self._storage_index) |
| 182 | full_peerlist = [(s.get_serverid(), s.get_rref()) |
| 183 | for s in sb.get_servers_for_psi(self._storage_index)] |
183 | 184 | self.full_peerlist = full_peerlist # for use later, immutable |
184 | 185 | self.bad_peers = set() # peerids who have errbacked/refused requests |
185 | 186 | |
-
diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py
index 999691f..c69e410 100644
a
|
b
|
class ServermapUpdater: |
424 | 424 | self._queries_completed = 0 |
425 | 425 | |
426 | 426 | sb = self._storage_broker |
427 | | full_peerlist = sb.get_servers_for_index(self._storage_index) |
| 427 | full_peerlist = [(s.get_serverid(), s.get_rref()) |
| 428 | for s in sb.get_servers_for_psi(self._storage_index)] |
428 | 429 | self.full_peerlist = full_peerlist # for use later, immutable |
429 | 430 | self.extra_peers = full_peerlist[:] # peers are removed as we use them |
430 | 431 | self._good_peers = set() # peers who had some shares |
-
diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py
index 7945eac..e1b3f55 100644
a
|
b
|
from zope.interface import implements, Interface |
34 | 34 | from foolscap.api import eventually |
35 | 35 | from allmydata.interfaces import IStorageBroker |
36 | 36 | from allmydata.util import idlib, log |
37 | | from allmydata.util.assertutil import _assert, precondition |
| 37 | from allmydata.util.assertutil import precondition |
38 | 38 | from allmydata.util.rrefutil import add_version_to_remote_reference |
39 | 39 | from allmydata.util.hashutil import sha1 |
40 | 40 | |
… |
… |
class StorageFarmBroker: |
66 | 66 | self.tub = tub |
67 | 67 | assert permute_peers # False not implemented yet |
68 | 68 | self.permute_peers = permute_peers |
69 | | # self.descriptors maps serverid -> IServerDescriptor, and keeps |
70 | | # track of all the storage servers that we've heard about. Each |
71 | | # descriptor manages its own Reconnector, and will give us a |
72 | | # RemoteReference when we ask them for it. |
73 | | self.descriptors = {} |
| 69 | # self.servers maps serverid -> IServer, and keeps track of all the |
| 70 | # storage servers that we've heard about. Each descriptor manages its |
| 71 | # own Reconnector, and will give us a RemoteReference when we ask |
| 72 | # them for it. |
| 73 | self.servers = {} |
74 | 74 | # self.test_servers are statically configured from unit tests |
75 | 75 | self.test_servers = {} # serverid -> rref |
76 | 76 | self.introducer_client = None |
… |
… |
class StorageFarmBroker: |
79 | 79 | def test_add_server(self, serverid, rref): |
80 | 80 | self.test_servers[serverid] = rref |
81 | 81 | def test_add_descriptor(self, serverid, dsc): |
82 | | self.descriptors[serverid] = dsc |
| 82 | self.servers[serverid] = dsc |
83 | 83 | |
84 | 84 | def use_introducer(self, introducer_client): |
85 | 85 | self.introducer_client = ic = introducer_client |
… |
… |
class StorageFarmBroker: |
89 | 89 | precondition(isinstance(serverid, str), serverid) |
90 | 90 | precondition(len(serverid) == 20, serverid) |
91 | 91 | assert ann_d["service-name"] == "storage" |
92 | | old = self.descriptors.get(serverid) |
| 92 | old = self.servers.get(serverid) |
93 | 93 | if old: |
94 | 94 | if old.get_announcement() == ann_d: |
95 | 95 | return # duplicate |
96 | 96 | # replacement |
97 | | del self.descriptors[serverid] |
| 97 | del self.servers[serverid] |
98 | 98 | old.stop_connecting() |
99 | 99 | # now we forget about them and start using the new one |
100 | | dsc = NativeStorageClientDescriptor(serverid, ann_d) |
101 | | self.descriptors[serverid] = dsc |
| 100 | dsc = NativeStorageServer(serverid, ann_d) |
| 101 | self.servers[serverid] = dsc |
102 | 102 | dsc.start_connecting(self.tub, self._trigger_connections) |
103 | 103 | # the descriptor will manage their own Reconnector, and each time we |
104 | 104 | # need servers, we'll ask them if they're connected or not. |
… |
… |
class StorageFarmBroker: |
111 | 111 | # connections to only a subset of the servers, which would increase |
112 | 112 | # the chances that we'll put shares in weird places (and not update |
113 | 113 | # existing shares of mutable files). See #374 for more details. |
114 | | for dsc in self.descriptors.values(): |
| 114 | for dsc in self.servers.values(): |
115 | 115 | dsc.try_to_connect() |
116 | 116 | |
117 | | |
118 | | |
119 | | def get_servers_for_index(self, peer_selection_index): |
120 | | # first cut: return a list of (peerid, versioned-rref) tuples |
| 117 | def get_servers_for_psi(self, peer_selection_index): |
| 118 | # return a list of server objects (IServers) |
121 | 119 | assert self.permute_peers == True |
122 | | servers = self.get_all_servers() |
123 | | key = peer_selection_index |
124 | | return sorted(servers, key=lambda x: sha1(key+x[0]).digest()) |
125 | | |
126 | | def get_all_servers(self): |
127 | | # return a frozenset of (peerid, versioned-rref) tuples |
128 | | servers = {} |
129 | | for serverid,rref in self.test_servers.items(): |
130 | | servers[serverid] = rref |
131 | | for serverid,dsc in self.descriptors.items(): |
132 | | rref = dsc.get_rref() |
133 | | if rref: |
134 | | servers[serverid] = rref |
135 | | result = frozenset(servers.items()) |
136 | | _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids()) |
137 | | return result |
| 120 | def _permuted(server): |
| 121 | seed = server.get_permutation_seed() |
| 122 | return sha1(peer_selection_index + seed).digest() |
| 123 | return sorted(self.get_connected_servers(), key=_permuted) |
138 | 124 | |
139 | 125 | def get_all_serverids(self): |
140 | 126 | serverids = set() |
141 | 127 | serverids.update(self.test_servers.keys()) |
142 | | serverids.update(self.descriptors.keys()) |
| 128 | serverids.update(self.servers.keys()) |
143 | 129 | return frozenset(serverids) |
144 | 130 | |
145 | | def get_all_descriptors(self): |
146 | | return sorted(self.descriptors.values(), |
147 | | key=lambda dsc: dsc.get_serverid()) |
| 131 | def get_connected_servers(self): |
| 132 | return frozenset([s for s in self.get_known_servers() |
| 133 | if s.get_rref()]) |
| 134 | |
| 135 | def get_known_servers(self): |
| 136 | servers = [] |
| 137 | for serverid,rref in self.test_servers.items(): |
| 138 | s = NativeStorageServer(serverid, {}) |
| 139 | s.rref = rref |
| 140 | servers.append(s) |
| 141 | servers.extend(self.servers.values()) |
| 142 | return sorted(servers, key=lambda s: s.get_serverid()) |
148 | 143 | |
149 | 144 | def get_nickname_for_serverid(self, serverid): |
150 | | if serverid in self.descriptors: |
151 | | return self.descriptors[serverid].get_nickname() |
| 145 | if serverid in self.servers: |
| 146 | return self.servers[serverid].get_nickname() |
152 | 147 | return None |
153 | 148 | |
154 | 149 | |
155 | | class IServerDescriptor(Interface): |
| 150 | class IServer(Interface): |
| 151 | """I live in the client, and represent a single server.""" |
156 | 152 | def start_connecting(tub, trigger_cb): |
157 | 153 | pass |
158 | 154 | def get_nickname(): |
… |
… |
class IServerDescriptor(Interface): |
160 | 156 | def get_rref(): |
161 | 157 | pass |
162 | 158 | |
163 | | class NativeStorageClientDescriptor: |
| 159 | class NativeStorageServer: |
164 | 160 | """I hold information about a storage server that we want to connect to. |
165 | 161 | If we are connected, I hold the RemoteReference, their host address, and |
166 | 162 | the their version information. I remember information about when we were |
… |
… |
class NativeStorageClientDescriptor: |
176 | 172 | @ivar rref: the RemoteReference, if connected, otherwise None |
177 | 173 | @ivar remote_host: the IAddress, if connected, otherwise None |
178 | 174 | """ |
179 | | implements(IServerDescriptor) |
| 175 | implements(IServer) |
180 | 176 | |
181 | 177 | VERSION_DEFAULTS = { |
182 | 178 | "http://allmydata.org/tahoe/protocols/storage/v1" : |
… |
… |
class NativeStorageClientDescriptor: |
203 | 199 | |
204 | 200 | def get_serverid(self): |
205 | 201 | return self.serverid |
| 202 | def get_permutation_seed(self): |
| 203 | return self.serverid |
206 | 204 | |
207 | 205 | def get_nickname(self): |
208 | 206 | return self.announcement["nickname"].decode("utf-8") |
-
diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py
index e8117bf..dfcaa10 100644
a
|
b
|
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): |
572 | 572 | if not c.connected_to_introducer(): |
573 | 573 | return False |
574 | 574 | sb = c.get_storage_broker() |
575 | | if len(sb.get_all_servers()) != self.numclients: |
| 575 | if len(sb.get_connected_servers()) != self.numclients: |
576 | 576 | return False |
577 | 577 | return True |
578 | 578 | |
-
diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py
index 554a75f..f07c36b 100644
a
|
b
|
def wrap_storage_server(original): |
117 | 117 | wrapper.version = original.remote_get_version() |
118 | 118 | return wrapper |
119 | 119 | |
| 120 | class NoNetworkServer: |
| 121 | def __init__(self, serverid, rref): |
| 122 | self.serverid = serverid |
| 123 | self.rref = rref |
| 124 | def get_serverid(self): |
| 125 | return self.serverid |
| 126 | def get_permutation_seed(self): |
| 127 | return self.serverid |
| 128 | def get_rref(self): |
| 129 | return self.rref |
| 130 | |
120 | 131 | class NoNetworkStorageBroker: |
121 | 132 | implements(IStorageBroker) |
122 | | def get_servers_for_index(self, key): |
123 | | return sorted(self.client._servers, |
124 | | key=lambda x: sha1(key+x[0]).digest()) |
125 | | def get_all_servers(self): |
126 | | return frozenset(self.client._servers) |
| 133 | def get_servers_for_psi(self, peer_selection_index): |
| 134 | def _permuted(server): |
| 135 | seed = server.get_permutation_seed() |
| 136 | return sha1(peer_selection_index + seed).digest() |
| 137 | return sorted(self.get_connected_servers(), key=_permuted) |
| 138 | def get_connected_servers(self): |
| 139 | return self.client._servers |
127 | 140 | def get_nickname_for_serverid(self, serverid): |
128 | 141 | return None |
129 | 142 | |
… |
… |
class NoNetworkGrid(service.MultiService): |
181 | 194 | self.basedir = basedir |
182 | 195 | fileutil.make_dirs(basedir) |
183 | 196 | |
184 | | self.servers_by_number = {} |
185 | | self.servers_by_id = {} |
| 197 | self.servers_by_number = {} # maps to StorageServer instance |
| 198 | self.wrappers_by_id = {} # maps to wrapped StorageServer instance |
| 199 | self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped |
| 200 | # StorageServer |
186 | 201 | self.clients = [] |
187 | 202 | |
188 | 203 | for i in range(num_servers): |
… |
… |
class NoNetworkGrid(service.MultiService): |
234 | 249 | ss.setServiceParent(middleman) |
235 | 250 | serverid = ss.my_nodeid |
236 | 251 | self.servers_by_number[i] = ss |
237 | | self.servers_by_id[serverid] = wrap_storage_server(ss) |
| 252 | wrapper = wrap_storage_server(ss) |
| 253 | self.wrappers_by_id[serverid] = wrapper |
| 254 | self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper) |
238 | 255 | self.rebuild_serverlist() |
239 | 256 | |
| 257 | def get_all_serverids(self): |
| 258 | return self.proxies_by_id.keys() |
| 259 | |
240 | 260 | def rebuild_serverlist(self): |
241 | | self.all_servers = frozenset(self.servers_by_id.items()) |
| 261 | self.all_servers = frozenset(self.proxies_by_id.values()) |
242 | 262 | for c in self.clients: |
243 | 263 | c._servers = self.all_servers |
244 | 264 | |
… |
… |
class NoNetworkGrid(service.MultiService): |
249 | 269 | if ss.my_nodeid == serverid: |
250 | 270 | del self.servers_by_number[i] |
251 | 271 | break |
252 | | del self.servers_by_id[serverid] |
| 272 | del self.wrappers_by_id[serverid] |
| 273 | del self.proxies_by_id[serverid] |
253 | 274 | self.rebuild_serverlist() |
254 | 275 | |
255 | 276 | def break_server(self, serverid): |
256 | 277 | # mark the given server as broken, so it will throw exceptions when |
257 | 278 | # asked to hold a share or serve a share |
258 | | self.servers_by_id[serverid].broken = True |
| 279 | self.wrappers_by_id[serverid].broken = True |
259 | 280 | |
260 | 281 | def hang_server(self, serverid): |
261 | 282 | # hang the given server |
262 | | ss = self.servers_by_id[serverid] |
| 283 | ss = self.wrappers_by_id[serverid] |
263 | 284 | assert ss.hung_until is None |
264 | 285 | ss.hung_until = defer.Deferred() |
265 | 286 | |
266 | 287 | def unhang_server(self, serverid): |
267 | 288 | # unhang the given server |
268 | | ss = self.servers_by_id[serverid] |
| 289 | ss = self.wrappers_by_id[serverid] |
269 | 290 | assert ss.hung_until is not None |
270 | 291 | ss.hung_until.callback(None) |
271 | 292 | ss.hung_until = None |
-
diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py
index 7548d7c..d1cf86f 100644
a
|
b
|
import simplejson |
3 | 3 | from twisted.trial import unittest |
4 | 4 | from allmydata import check_results, uri |
5 | 5 | from allmydata.web import check_results as web_check_results |
6 | | from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor |
| 6 | from allmydata.storage_client import StorageFarmBroker, NativeStorageServer |
7 | 7 | from allmydata.monitor import Monitor |
8 | 8 | from allmydata.test.no_network import GridTestMixin |
9 | 9 | from allmydata.immutable.upload import Data |
… |
… |
class WebResultsRendering(unittest.TestCase, WebRenderingMixin): |
28 | 28 | "my-version": "ver", |
29 | 29 | "oldest-supported": "oldest", |
30 | 30 | } |
31 | | dsc = NativeStorageClientDescriptor(peerid, ann_d) |
| 31 | dsc = NativeStorageServer(peerid, ann_d) |
32 | 32 | sb.test_add_descriptor(peerid, dsc) |
33 | 33 | c = FakeClient() |
34 | 34 | c.storage_broker = sb |
-
diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py
index 7c6a456..abf63aa 100644
a
|
b
|
class Basic(testutil.ReallyEqualMixin, unittest.TestCase): |
134 | 134 | self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0) |
135 | 135 | |
136 | 136 | def _permute(self, sb, key): |
137 | | return [ peerid |
138 | | for (peerid,rref) in sb.get_servers_for_index(key) ] |
| 137 | return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ] |
139 | 138 | |
140 | 139 | def test_permute(self): |
141 | 140 | sb = StorageFarmBroker(None, True) |
142 | 141 | for k in ["%d" % i for i in range(5)]: |
143 | | sb.test_add_server(k, None) |
| 142 | sb.test_add_server(k, "rref") |
144 | 143 | |
145 | 144 | self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2']) |
146 | 145 | self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3']) |
-
diff --git a/src/allmydata/test/test_deepcheck.py b/src/allmydata/test/test_deepcheck.py
index 5168b37..4bcfe28 100644
a
|
b
|
class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): |
287 | 287 | self.failUnlessEqual(d["list-corrupt-shares"], [], where) |
288 | 288 | if not incomplete: |
289 | 289 | self.failUnlessEqual(sorted(d["servers-responding"]), |
290 | | sorted(self.g.servers_by_id.keys()), |
| 290 | sorted(self.g.get_all_serverids()), |
291 | 291 | where) |
292 | 292 | self.failUnless("sharemap" in d, str((where, d))) |
293 | 293 | all_serverids = set() |
294 | 294 | for (shareid, serverids) in d["sharemap"].items(): |
295 | 295 | all_serverids.update(serverids) |
296 | 296 | self.failUnlessEqual(sorted(all_serverids), |
297 | | sorted(self.g.servers_by_id.keys()), |
| 297 | sorted(self.g.get_all_serverids()), |
298 | 298 | where) |
299 | 299 | |
300 | 300 | self.failUnlessEqual(d["count-wrong-shares"], 0, where) |
… |
… |
class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): |
545 | 545 | if not incomplete: |
546 | 546 | self.failUnlessEqual(sorted(r["servers-responding"]), |
547 | 547 | sorted([idlib.nodeid_b2a(sid) |
548 | | for sid in self.g.servers_by_id]), |
| 548 | for sid in self.g.get_all_serverids()]), |
549 | 549 | where) |
550 | 550 | self.failUnless("sharemap" in r, where) |
551 | 551 | all_serverids = set() |
… |
… |
class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): |
553 | 553 | all_serverids.update(serverids_s) |
554 | 554 | self.failUnlessEqual(sorted(all_serverids), |
555 | 555 | sorted([idlib.nodeid_b2a(sid) |
556 | | for sid in self.g.servers_by_id]), |
| 556 | for sid in self.g.get_all_serverids()]), |
557 | 557 | where) |
558 | 558 | self.failUnlessEqual(r["count-wrong-shares"], 0, where) |
559 | 559 | self.failUnlessEqual(r["count-recoverable-versions"], 1, where) |
-
diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py
index 3a69220..ab2e98b 100644
a
|
b
|
class DownloadTest(_Base, unittest.TestCase): |
596 | 596 | # tweak the client's copies of server-version data, so it believes |
597 | 597 | # that they're old and can't handle reads that overrun the length of |
598 | 598 | # the share. This exercises a different code path. |
599 | | for (peerid, rref) in self.c0.storage_broker.get_all_servers(): |
| 599 | for s in self.c0.storage_broker.get_connected_servers(): |
| 600 | rref = s.get_rref() |
600 | 601 | v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] |
601 | 602 | v1["tolerates-immutable-read-overrun"] = False |
602 | 603 | |
… |
… |
class DownloadV2(_Base, unittest.TestCase): |
1167 | 1168 | # tweak the client's copies of server-version data, so it believes |
1168 | 1169 | # that they're old and can't handle reads that overrun the length of |
1169 | 1170 | # the share. This exercises a different code path. |
1170 | | for (peerid, rref) in self.c0.storage_broker.get_all_servers(): |
| 1171 | for s in self.c0.storage_broker.get_connected_servers(): |
| 1172 | rref = s.get_rref() |
1171 | 1173 | v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] |
1172 | 1174 | v1["tolerates-immutable-read-overrun"] = False |
1173 | 1175 | |
… |
… |
class DownloadV2(_Base, unittest.TestCase): |
1186 | 1188 | self.set_up_grid() |
1187 | 1189 | self.c0 = self.g.clients[0] |
1188 | 1190 | |
1189 | | for (peerid, rref) in self.c0.storage_broker.get_all_servers(): |
| 1191 | for s in self.c0.storage_broker.get_connected_servers(): |
| 1192 | rref = s.get_rref() |
1190 | 1193 | v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] |
1191 | 1194 | v1["tolerates-immutable-read-overrun"] = False |
1192 | 1195 | |
-
diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py
index e63cba3..abed967 100644
a
|
b
|
class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, |
101 | 101 | |
102 | 102 | self.c0 = self.g.clients[0] |
103 | 103 | nm = self.c0.nodemaker |
104 | | self.servers = sorted([(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()]) |
| 104 | self.servers = sorted([(s.get_serverid(), s.get_rref()) |
| 105 | for s in nm.storage_broker.get_connected_servers()]) |
105 | 106 | self.servers = self.servers[5:] + self.servers[:5] |
106 | 107 | |
107 | 108 | if mutable: |
-
diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py
index edac3e6..1ca4383 100644
a
|
b
|
class TestShareFinder(unittest.TestCase): |
95 | 95 | self.s.hungry() |
96 | 96 | eventually(_give_buckets_and_hunger_again) |
97 | 97 | return d |
| 98 | class MockIServer(object): |
| 99 | def __init__(self, serverid, rref): |
| 100 | self.serverid = serverid |
| 101 | self.rref = rref |
| 102 | def get_serverid(self): |
| 103 | return self.serverid |
| 104 | def get_rref(self): |
| 105 | return self.rref |
98 | 106 | |
99 | 107 | mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()}) |
100 | 108 | mockserver2 = MockServer({}) |
101 | 109 | mockserver3 = MockServer({3: mock.Mock()}) |
102 | 110 | mockstoragebroker = mock.Mock() |
103 | | mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2), ('ms3', mockserver3), ] |
| 111 | servers = [ MockIServer("ms1", mockserver1), |
| 112 | MockIServer("ms2", mockserver2), |
| 113 | MockIServer("ms3", mockserver3), ] |
| 114 | mockstoragebroker.get_servers_for_psi.return_value = servers |
104 | 115 | mockdownloadstatus = mock.Mock() |
105 | 116 | mocknode = MockNode(check_reneging=True, check_fetch_failed=True) |
106 | 117 | |
-
diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py
index e4e6eb7..0f49dce 100644
a
|
b
|
class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): |
1910 | 1910 | d.addCallback(_got_key) |
1911 | 1911 | def _break_peer0(res): |
1912 | 1912 | si = self._storage_index |
1913 | | peerlist = nm.storage_broker.get_servers_for_index(si) |
1914 | | peerid0, connection0 = peerlist[0] |
1915 | | peerid1, connection1 = peerlist[1] |
1916 | | connection0.broken = True |
1917 | | self.connection1 = connection1 |
| 1913 | servers = nm.storage_broker.get_servers_for_psi(si) |
| 1914 | self.g.break_server(servers[0].get_serverid()) |
| 1915 | self.server1 = servers[1] |
1918 | 1916 | d.addCallback(_break_peer0) |
1919 | 1917 | # now "create" the file, using the pre-established key, and let the |
1920 | 1918 | # initial publish finally happen |
… |
… |
class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): |
1925 | 1923 | d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) |
1926 | 1924 | # now break the second peer |
1927 | 1925 | def _break_peer1(res): |
1928 | | self.connection1.broken = True |
| 1926 | self.g.break_server(self.server1.get_serverid()) |
1929 | 1927 | d.addCallback(_break_peer1) |
1930 | 1928 | d.addCallback(lambda res: n.overwrite("contents 2")) |
1931 | 1929 | # that ought to work too |
… |
… |
class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): |
1956 | 1954 | nm = self.g.clients[0].nodemaker |
1957 | 1955 | sb = nm.storage_broker |
1958 | 1956 | |
1959 | | peerids = [serverid for (serverid,ss) in sb.get_all_servers()] |
| 1957 | peerids = [s.get_serverid() for s in sb.get_connected_servers()] |
1960 | 1958 | self.g.break_server(peerids[0]) |
1961 | 1959 | |
1962 | 1960 | d = nm.create_mutable_file("contents 1") |
… |
… |
class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): |
1980 | 1978 | self.basedir = "mutable/Problems/test_publish_all_servers_bad" |
1981 | 1979 | self.set_up_grid() |
1982 | 1980 | nm = self.g.clients[0].nodemaker |
1983 | | for (serverid,ss) in nm.storage_broker.get_all_servers(): |
1984 | | ss.broken = True |
| 1981 | for s in nm.storage_broker.get_connected_servers(): |
| 1982 | s.get_rref().broken = True |
1985 | 1983 | |
1986 | 1984 | d = self.shouldFail(NotEnoughServersError, |
1987 | 1985 | "test_publish_all_servers_bad", |
… |
… |
class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): |
2033 | 2031 | # 1. notice which server gets a read() call first |
2034 | 2032 | # 2. tell that server to start throwing errors |
2035 | 2033 | killer = FirstServerGetsKilled() |
2036 | | for (serverid,ss) in nm.storage_broker.get_all_servers(): |
2037 | | ss.post_call_notifier = killer.notify |
| 2034 | for s in nm.storage_broker.get_connected_servers(): |
| 2035 | s.get_rref().post_call_notifier = killer.notify |
2038 | 2036 | d.addCallback(_created) |
2039 | 2037 | |
2040 | 2038 | # now we update a servermap from a new node (which doesn't have the |
… |
… |
class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): |
2059 | 2057 | self.uri = n.get_uri() |
2060 | 2058 | self.n2 = nm.create_from_cap(self.uri) |
2061 | 2059 | deleter = FirstServerGetsDeleted() |
2062 | | for (serverid,ss) in nm.storage_broker.get_all_servers(): |
2063 | | ss.post_call_notifier = deleter.notify |
| 2060 | for s in nm.storage_broker.get_connected_servers(): |
| 2061 | s.get_rref().post_call_notifier = deleter.notify |
2064 | 2062 | d.addCallback(_created) |
2065 | 2063 | d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE)) |
2066 | 2064 | return d |
-
diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py
index bf6af09..e37f663 100644
a
|
b
|
class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): |
65 | 65 | all_peerids = c.get_storage_broker().get_all_serverids() |
66 | 66 | self.failUnlessEqual(len(all_peerids), self.numclients+1) |
67 | 67 | sb = c.storage_broker |
68 | | permuted_peers = sb.get_servers_for_index("a") |
| 68 | permuted_peers = sb.get_servers_for_psi("a") |
69 | 69 | self.failUnlessEqual(len(permuted_peers), self.numclients+1) |
70 | 70 | |
71 | 71 | d.addCallback(_check) |
… |
… |
class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): |
101 | 101 | all_peerids = c.get_storage_broker().get_all_serverids() |
102 | 102 | self.failUnlessEqual(len(all_peerids), self.numclients) |
103 | 103 | sb = c.storage_broker |
104 | | permuted_peers = sb.get_servers_for_index("a") |
| 104 | permuted_peers = sb.get_servers_for_psi("a") |
105 | 105 | self.failUnlessEqual(len(permuted_peers), self.numclients) |
106 | 106 | d.addCallback(_check_connections) |
107 | 107 | |
-
diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py
index 022185e..1125932 100644
a
|
b
|
class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, |
1730 | 1730 | d.addCallback(lambda ign: |
1731 | 1731 | self._add_server(server_number=2)) |
1732 | 1732 | def _break_server_2(ign): |
1733 | | server = self.g.servers_by_number[2].my_nodeid |
1734 | | # We have to break the server in servers_by_id, |
1735 | | # because the one in servers_by_number isn't wrapped, |
1736 | | # and doesn't look at its broken attribute when answering |
1737 | | # queries. |
1738 | | self.g.servers_by_id[server].broken = True |
| 1733 | serverid = self.g.servers_by_number[2].my_nodeid |
| 1734 | self.g.break_server(serverid) |
1739 | 1735 | d.addCallback(_break_server_2) |
1740 | 1736 | d.addCallback(lambda ign: |
1741 | 1737 | self._add_server(server_number=3, readonly=True)) |
-
diff --git a/src/allmydata/web/check_results.py b/src/allmydata/web/check_results.py
index 96ce17b..44d0506 100644
a
|
b
|
class ResultsBase: |
139 | 139 | |
140 | 140 | # this table is sorted by permuted order |
141 | 141 | sb = c.get_storage_broker() |
142 | | permuted_peer_ids = [peerid |
143 | | for (peerid, rref) |
144 | | in sb.get_servers_for_index(cr.get_storage_index())] |
| 142 | permuted_peer_ids = [s.get_serverid() |
| 143 | for s |
| 144 | in sb.get_servers_for_psi(cr.get_storage_index())] |
145 | 145 | |
146 | 146 | num_shares_left = sum([len(shares) for shares in servers.values()]) |
147 | 147 | servermap = [] |
-
diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py
index 3af15d9..2f1f6d4 100644
a
|
b
|
class Root(rend.Page): |
247 | 247 | |
248 | 248 | def data_connected_storage_servers(self, ctx, data): |
249 | 249 | sb = self.client.get_storage_broker() |
250 | | return len(sb.get_all_servers()) |
| 250 | return len(sb.get_connected_servers()) |
251 | 251 | |
252 | 252 | def data_services(self, ctx, data): |
253 | 253 | sb = self.client.get_storage_broker() |
254 | | return sb.get_all_descriptors() |
| 254 | return sb.get_known_servers() |
255 | 255 | |
256 | | def render_service_row(self, ctx, descriptor): |
257 | | nodeid = descriptor.get_serverid() |
| 256 | def render_service_row(self, ctx, server): |
| 257 | nodeid = server.get_serverid() |
258 | 258 | |
259 | 259 | ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid)) |
260 | | ctx.fillSlots("nickname", descriptor.get_nickname()) |
261 | | rhost = descriptor.get_remote_host() |
| 260 | ctx.fillSlots("nickname", server.get_nickname()) |
| 261 | rhost = server.get_remote_host() |
262 | 262 | if rhost: |
263 | 263 | if nodeid == self.client.nodeid: |
264 | 264 | rhost_s = "(loopback)" |
… |
… |
class Root(rend.Page): |
267 | 267 | else: |
268 | 268 | rhost_s = str(rhost) |
269 | 269 | connected = "Yes: to " + rhost_s |
270 | | since = descriptor.get_last_connect_time() |
| 270 | since = server.get_last_connect_time() |
271 | 271 | else: |
272 | 272 | connected = "No" |
273 | | since = descriptor.get_last_loss_time() |
274 | | announced = descriptor.get_announcement_time() |
275 | | announcement = descriptor.get_announcement() |
| 273 | since = server.get_last_loss_time() |
| 274 | announced = server.get_announcement_time() |
| 275 | announcement = server.get_announcement() |
276 | 276 | version = announcement["my-version"] |
277 | 277 | service_name = announcement["service-name"] |
278 | 278 | |