source: trunk/src/allmydata/introducer/server.py

Last change on this file was 2243ce3, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-28T00:07:08Z

remove "from past.builtins import long"

  • Property mode set to 100644
File size: 14.7 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from __future__ import annotations
6
7from six import ensure_text
8
9import time, os.path, textwrap
10from typing import Any, Union
11
12from zope.interface import implementer
13from twisted.application import service
14from twisted.internet import defer
15from twisted.internet.address import IPv4Address
16from twisted.python.failure import Failure
17from foolscap.api import Referenceable
18import allmydata
19from allmydata import node
20from allmydata.util import log, dictutil
21from allmydata.util.i2p_provider import create as create_i2p_provider
22from allmydata.util.tor_provider import create as create_tor_provider
23from allmydata.introducer.interfaces import \
24     RIIntroducerPublisherAndSubscriberService_v2
25from allmydata.introducer.common import unsign_from_foolscap, \
26     SubscriberDescriptor, AnnouncementDescriptor
27from allmydata.node import read_config
28from allmydata.node import create_node_dir
29from allmydata.node import create_connection_handlers
30from allmydata.node import create_tub_options
31from allmydata.node import create_main_tub
32
33
34# this is put into README in new node-directories
35INTRODUCER_README = """
36This directory contains files which contain private data for the Tahoe node,
37such as private keys.  On Unix-like systems, the permissions on this directory
38are set to disallow users other than its owner from reading the contents of
39the files.   See the 'configuration.rst' documentation file for details.
40"""
41
42_valid_config = node._common_valid_config
43
44class FurlFileConflictError(Exception):
45    pass
46
47def create_introducer(basedir=u"."):
48    """
49    :returns: a Deferred that yields a new _IntroducerNode instance
50    """
51    try:
52        # see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946
53        from twisted.internet import reactor
54
55        if not os.path.exists(basedir):
56            create_node_dir(basedir, INTRODUCER_README)
57
58        config = read_config(
59            basedir, u"client.port",
60            generated_files=["introducer.furl"],
61            _valid_config=_valid_config(),
62        )
63
64        i2p_provider = create_i2p_provider(reactor, config)
65        tor_provider = create_tor_provider(reactor, config)
66
67        default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(config, i2p_provider, tor_provider)
68        tub_options = create_tub_options(config)
69
70        main_tub = create_main_tub(
71            config, tub_options, default_connection_handlers,
72            foolscap_connection_handlers, i2p_provider, tor_provider,
73        )
74
75        node = _IntroducerNode(
76            config,
77            main_tub,
78            i2p_provider,
79            tor_provider,
80        )
81        i2p_provider.setServiceParent(node)
82        tor_provider.setServiceParent(node)
83        return defer.succeed(node)
84    except Exception:
85        return Failure()
86
87
88class _IntroducerNode(node.Node):
89    NODETYPE = "introducer"
90
91    def __init__(self, config, main_tub, i2p_provider, tor_provider):
92        node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
93        self.init_introducer()
94        webport = self.get_config("node", "web.port", None)
95        if webport:
96            self.init_web(webport) # strports string
97
98    def init_introducer(self):
99        if not self._is_tub_listening():
100            raise ValueError("config error: we are Introducer, but tub "
101                             "is not listening ('tub.port=' is empty)")
102        introducerservice = IntroducerService()
103        introducerservice.setServiceParent(self)
104
105        old_public_fn = self.config.get_config_path(u"introducer.furl")
106        private_fn = self.config.get_private_path(u"introducer.furl")
107
108        if os.path.exists(old_public_fn):
109            if os.path.exists(private_fn):
110                msg = """This directory (%s) contains both an old public
111                'introducer.furl' file, and a new-style
112                'private/introducer.furl', so I cannot safely remove the old
113                one. Please make sure your desired FURL is in
114                private/introducer.furl, and remove the public file. If this
115                causes your Introducer's FURL to change, you need to inform
116                all grid members so they can update their tahoe.cfg.
117                """
118                raise FurlFileConflictError(textwrap.dedent(msg))
119            os.rename(old_public_fn, private_fn)
120        furl = self.tub.registerReference(introducerservice,
121                                          furlFile=private_fn)
122        self.log(" introducer can be found in {!r}".format(private_fn), umid="qF2L9A")
123        self.introducer_url = furl # for tests
124
125    def init_web(self, webport):
126        self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
127
128        from allmydata.webish import IntroducerWebishServer
129        nodeurl_path = self.config.get_config_path(u"node.url")
130        config_staticdir = self.get_config("node", "web.static", "public_html")
131        staticdir = self.config.get_config_path(config_staticdir)
132        ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
133        ws.setServiceParent(self)
134
135
136def stringify_remote_address(rref):
137    remote = rref.getPeer()
138    if isinstance(remote, IPv4Address):
139        return "%s:%d" % (remote.host, remote.port)
140    # loopback is a non-IPv4Address
141    return str(remote)
142
143
144# MyPy doesn't work well with remote interfaces...
145@implementer(RIIntroducerPublisherAndSubscriberService_v2)
146class IntroducerService(service.MultiService, Referenceable):  # type: ignore[misc]
147    # The type in Twisted for services is wrong in 22.10...
148    # https://github.com/twisted/twisted/issues/10135
149    name = "introducer"  # type: ignore[assignment]
150    # v1 is the original protocol, added in 1.0 (but only advertised starting
151    # in 1.3), removed in 1.12. v2 is the new signed protocol, added in 1.10
152    # TODO: reconcile bytes/str for keys
153    VERSION : dict[Union[bytes, str], Any]= {
154                #"http://allmydata.org/tahoe/protocols/introducer/v1": { },
155                b"http://allmydata.org/tahoe/protocols/introducer/v2": { },
156                b"application-version": allmydata.__full_version__.encode("utf-8"),
157                }
158
159    def __init__(self):
160        service.MultiService.__init__(self)
161        self.introducer_url = None
162        # 'index' is (service_name, key_s, tubid), where key_s or tubid is
163        # None
164        self._announcements = {} # dict of index ->
165                                 # (ann_t, canary, ann, timestamp)
166
167        # ann (the announcement dictionary) is cleaned up: nickname is always
168        # unicode, servicename is always ascii, etc, even though
169        # simplejson.loads sometimes returns either
170
171        # self._subscribers is a dict mapping servicename to subscriptions
172        # 'subscriptions' is a dict mapping rref to a subscription
173        # 'subscription' is a tuple of (subscriber_info, timestamp)
174        # 'subscriber_info' is a dict, provided directly by v2 clients. The
175        # expected keys are: version, nickname, app-versions, my-version,
176        # oldest-supported
177        self._subscribers = dictutil.UnicodeKeyDict({})
178
179        self._debug_counts = {"inbound_message": 0,
180                              "inbound_duplicate": 0,
181                              "inbound_no_seqnum": 0,
182                              "inbound_old_replay": 0,
183                              "inbound_update": 0,
184                              "outbound_message": 0,
185                              "outbound_announcements": 0,
186                              "inbound_subscribe": 0}
187        self._debug_outstanding = 0
188
189    def _debug_retired(self, res):
190        self._debug_outstanding -= 1
191        return res
192
193    def log(self, *args, **kwargs):
194        if "facility" not in kwargs:
195            kwargs["facility"] = "tahoe.introducer.server"
196        return log.msg(*args, **kwargs)
197
198    def get_announcements(self):
199        """Return a list of AnnouncementDescriptor for all announcements"""
200        announcements = []
201        for (index, (_, canary, ann, when)) in list(self._announcements.items()):
202            ad = AnnouncementDescriptor(when, index, canary, ann)
203            announcements.append(ad)
204        return announcements
205
206    def get_subscribers(self):
207        """Return a list of SubscriberDescriptor objects for all subscribers"""
208        s = []
209        for service_name, subscriptions in list(self._subscribers.items()):
210            for rref,(subscriber_info,when) in list(subscriptions.items()):
211                # note that if the subscriber didn't do Tub.setLocation,
212                # tubid will be None. Also, subscribers do not tell us which
213                # pubkey they use; only publishers do that.
214                tubid = rref.getRemoteTubID() or "?"
215                remote_address = stringify_remote_address(rref)
216                # these three assume subscriber_info["version"]==0, but
217                # should tolerate other versions
218                nickname = subscriber_info.get("nickname", u"?")
219                version = subscriber_info.get("my-version", u"?")
220                app_versions = subscriber_info.get("app-versions", {})
221                # 'when' is the time they subscribed
222                sd = SubscriberDescriptor(service_name, when,
223                                          nickname, version, app_versions,
224                                          remote_address, tubid)
225                s.append(sd)
226        return s
227
228    def remote_get_version(self):
229        return self.VERSION
230
231    def remote_publish_v2(self, ann_t, canary):
232        lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
233        return self.publish(ann_t, canary, lp)
234
235    def publish(self, ann_t, canary, lp):
236        try:
237            self._publish(ann_t, canary, lp)
238        except:
239            log.err(format="Introducer.remote_publish failed on %(ann)s",
240                    ann=ann_t,
241                    level=log.UNUSUAL, parent=lp, umid="620rWA")
242            raise
243
244    def _publish(self, ann_t, canary, lp):
245        self._debug_counts["inbound_message"] += 1
246        self.log("introducer: announcement published: %s" % (ann_t,),
247                 umid="wKHgCw")
248        ann, key = unsign_from_foolscap(ann_t) # might raise BadSignature
249        service_name = str(ann["service-name"])
250
251        index = (service_name, key)
252        old = self._announcements.get(index)
253        if old:
254            (old_ann_t, canary, old_ann, timestamp) = old
255            if old_ann == ann:
256                self.log("but we already knew it, ignoring", level=log.NOISY,
257                         umid="myxzLw")
258                self._debug_counts["inbound_duplicate"] += 1
259                return
260            else:
261                if "seqnum" in old_ann:
262                    # must beat previous sequence number to replace
263                    if ("seqnum" not in ann
264                        or not isinstance(ann["seqnum"], int)):
265                        self.log("not replacing old ann, no valid seqnum",
266                                 level=log.NOISY, umid="ySbaVw")
267                        self._debug_counts["inbound_no_seqnum"] += 1
268                        return
269                    if ann["seqnum"] <= old_ann["seqnum"]:
270                        self.log("not replacing old ann, new seqnum is too old"
271                                 " (%s <= %s) (replay attack?)"
272                                 % (ann["seqnum"], old_ann["seqnum"]),
273                                 level=log.UNUSUAL, umid="sX7yqQ")
274                        self._debug_counts["inbound_old_replay"] += 1
275                        return
276                    # ok, seqnum is newer, allow replacement
277                self.log("old announcement being updated", level=log.NOISY,
278                         umid="304r9g")
279                self._debug_counts["inbound_update"] += 1
280        self._announcements[index] = (ann_t, canary, ann, time.time())
281        #if canary:
282        #    canary.notifyOnDisconnect ...
283        # use a CanaryWatcher? with cw.is_connected()?
284        # actually we just want foolscap to give rref.is_connected(), since
285        # this is only for the status display
286
287        for s in self._subscribers.get(service_name, []):
288            self._debug_counts["outbound_message"] += 1
289            self._debug_counts["outbound_announcements"] += 1
290            self._debug_outstanding += 1
291            d = s.callRemote("announce_v2", set([ann_t]))
292            d.addBoth(self._debug_retired)
293            d.addErrback(log.err,
294                         format="subscriber errored on announcement %(ann)s",
295                         ann=ann_t, facility="tahoe.introducer",
296                         level=log.UNUSUAL, umid="jfGMXQ")
297
298    def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
299        self.log("introducer: subscription[%r] request at %r"
300                 % (service_name, subscriber), umid="U3uzLg")
301        service_name = ensure_text(service_name)
302        subscriber_info = dictutil.UnicodeKeyDict({
303            ensure_text(k): v for (k, v) in subscriber_info.items()
304        })
305        return self.add_subscriber(subscriber, service_name, subscriber_info)
306
307    def add_subscriber(self, subscriber, service_name, subscriber_info):
308        self._debug_counts["inbound_subscribe"] += 1
309        if service_name not in self._subscribers:
310            self._subscribers[service_name] = {}
311        subscribers = self._subscribers[service_name]
312        if subscriber in subscribers:
313            self.log("but they're already subscribed, ignoring",
314                     level=log.UNUSUAL, umid="Sy9EfA")
315            return
316
317        assert subscriber_info
318
319        subscribers[subscriber] = (subscriber_info, time.time())
320        def _remove():
321            self.log("introducer: unsubscribing[%s] %s" % (service_name,
322                                                           subscriber),
323                     umid="vYGcJg")
324            subscribers.pop(subscriber, None)
325        subscriber.notifyOnDisconnect(_remove)
326
327        # Make sure types are correct:
328        for k in self._announcements:
329            assert isinstance(k[0], type(service_name))
330
331        # now tell them about any announcements they're interested in
332        announcements = set( [ ann_t
333                               for idx,(ann_t,canary,ann,when)
334                               in self._announcements.items()
335                               if idx[0] == service_name] )
336        if announcements:
337            self._debug_counts["outbound_message"] += 1
338            self._debug_counts["outbound_announcements"] += len(announcements)
339            self._debug_outstanding += 1
340            d = subscriber.callRemote("announce_v2", announcements)
341            d.addBoth(self._debug_retired)
342            d.addErrback(log.err,
343                         format="subscriber errored during subscribe %(anns)s",
344                         anns=announcements, facility="tahoe.introducer",
345                         level=log.UNUSUAL, umid="mtZepQ")
346            return d
Note: See TracBrowser for help on using the repository browser.