source: trunk/src/allmydata/test/cli/wormholetesting.py

Last change on this file was f42fb1e, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-06-14T18:16:35Z

Unused import

  • Property mode set to 100644
File size: 10.1 KB
Line 
1"""
2An in-memory implementation of some of the magic-wormhole interfaces for
3use by automated tests.
4
5For example::
6
7    async def peerA(mw):
8        wormhole = mw.create("myapp", "wss://myserver", reactor)
9        code = await wormhole.get_code()
10        print(f"I have a code: {code}")
11        message = await wormhole.when_received()
12        print(f"I have a message: {message}")
13
14    async def local_peerB(helper, mw):
15        peerA_wormhole = await helper.wait_for_wormhole("myapp", "wss://myserver")
16        code = await peerA_wormhole.when_code()
17
18        peerB_wormhole = mw.create("myapp", "wss://myserver")
19        peerB_wormhole.set_code(code)
20
21        peerB_wormhole.send_message("Hello, peer A")
22
23    # Run peerA against local_peerB with pure in-memory message passing.
24    server, helper = memory_server()
25    run(gather(peerA(server), local_peerB(helper, server)))
26
27    # Run peerA against a peerB somewhere out in the world, using a real
28    # wormhole relay server somewhere.
29    import wormhole
30    run(peerA(wormhole))
31"""
32
33from __future__ import annotations
34
35__all__ = ['MemoryWormholeServer', 'TestingHelper', 'memory_server', 'IWormhole']
36
37from typing import Iterator, Optional, List, Tuple, Any, TextIO
38from inspect import getfullargspec
39from itertools import count
40from sys import stderr
41
42from attrs import frozen, define, field, Factory
43from twisted.internet.defer import Deferred, DeferredQueue, succeed
44from wormhole._interfaces import IWormhole
45from wormhole.wormhole import create
46from zope.interface import implementer
47
48WormholeCode = str
49WormholeMessage = bytes
50AppId = str
51RelayURL = str
52ApplicationKey = Tuple[RelayURL, AppId]
53
54@define
55class MemoryWormholeServer(object):
56    """
57    A factory for in-memory wormholes.
58
59    :ivar _apps: Wormhole state arranged by the application id and relay URL
60        it belongs to.
61
62    :ivar _waiters: Observers waiting for a wormhole to be created for a
63        specific application id and relay URL combination.
64    """
65    _apps: dict[ApplicationKey, _WormholeApp] = field(default=Factory(dict))
66    _waiters: dict[ApplicationKey, Deferred[IWormhole]] = field(default=Factory(dict))
67
68    def create(
69        self,
70        appid: str,
71        relay_url: str,
72        reactor: Any,
73        # Unfortunately we need a mutable default to match the real API
74        versions: Any={},  # noqa: B006
75        delegate: Optional[Any]=None,
76        journal: Optional[Any]=None,
77        tor: Optional[Any]=None,
78        timing: Optional[Any]=None,
79        stderr: TextIO=stderr,
80        _eventual_queue: Optional[Any]=None,
81        _enable_dilate: bool=False,
82    ) -> _MemoryWormhole:
83        """
84        Create a wormhole.  It will be able to connect to other wormholes created
85        by this instance (and constrained by the normal appid/relay_url
86        rules).
87        """
88        if tor is not None:
89            raise ValueError("Cannot deal with Tor right now.")
90        if _enable_dilate:
91            raise ValueError("Cannot deal with dilation right now.")
92
93        key = (relay_url, appid)
94        wormhole = _MemoryWormhole(self._view(key))
95        if key in self._waiters:
96            self._waiters.pop(key).callback(wormhole)
97        return wormhole
98
99    def _view(self, key: ApplicationKey) -> _WormholeServerView:
100        """
101        Created a view onto this server's state that is limited by a certain
102        appid/relay_url pair.
103        """
104        return _WormholeServerView(self, key)
105
106
107@frozen
108class TestingHelper(object):
109    """
110    Provide extra functionality for interacting with an in-memory wormhole
111    implementation.
112
113    This is intentionally a separate API so that it is not confused with
114    proper public interface of the real wormhole implementation.
115    """
116    _server: MemoryWormholeServer
117
118    async def wait_for_wormhole(self, appid: AppId, relay_url: RelayURL) -> IWormhole:
119        """
120        Wait for a wormhole to appear at a specific location.
121
122        :param appid: The appid that the resulting wormhole will have.
123
124        :param relay_url: The URL of the relay at which the resulting wormhole
125            will presume to be created.
126
127        :return: The first wormhole to be created which matches the given
128            parameters.
129        """
130        key = (relay_url, appid)
131        if key in self._server._waiters:
132            raise ValueError(f"There is already a waiter for {key}")
133        d : Deferred[IWormhole] = Deferred()
134        self._server._waiters[key] = d
135        wormhole = await d
136        return wormhole
137
138
139def _verify() -> None:
140    """
141    Roughly confirm that the in-memory wormhole creation function matches the
142    interface of the real implementation.
143    """
144    # Poor man's interface verification.
145
146    a = getfullargspec(create)
147    b = getfullargspec(MemoryWormholeServer.create)
148    # I know it has a `self` argument at the beginning.  That's okay.
149    b = b._replace(args=b.args[1:])
150
151    # Just compare the same information to check function signature
152    assert a.varkw == b.varkw
153    assert a.args == b.args
154    assert a.varargs == b.varargs
155    assert a.kwonlydefaults == b.kwonlydefaults
156    assert a.defaults == b.defaults
157
158
159_verify()
160
161
162@define
163class _WormholeApp(object):
164    """
165    Represent a collection of wormholes that belong to the same
166    appid/relay_url scope.
167    """
168    wormholes: dict[WormholeCode, IWormhole] = field(default=Factory(dict))
169    _waiting: dict[WormholeCode, List[Deferred[_MemoryWormhole]]] = field(default=Factory(dict))
170    _counter: Iterator[int] = field(default=Factory(count))
171
172    def allocate_code(self, wormhole: IWormhole, code: Optional[WormholeCode]) -> WormholeCode:
173        """
174        Allocate a new code for the given wormhole.
175
176        This also associates the given wormhole with the code for future
177        lookup.
178
179        Code generation logic is trivial and certainly not good enough for any
180        real use.  It is sufficient for automated testing, though.
181        """
182        if code is None:
183            code = "{}-persnickety-tardigrade".format(next(self._counter))
184        self.wormholes.setdefault(code, []).append(wormhole)
185        try:
186            waiters = self._waiting.pop(code)
187        except KeyError:
188            pass
189        else:
190            for w in waiters:
191                w.callback(wormhole)
192
193        return code
194
195    def wait_for_wormhole(self, code: WormholeCode) -> Deferred[_MemoryWormhole]:
196        """
197        Return a ``Deferred`` which fires with the next wormhole to be associated
198        with the given code.  This is used to let the first end of a wormhole
199        rendezvous with the second end.
200        """
201        d : Deferred[_MemoryWormhole] = Deferred()
202        self._waiting.setdefault(code, []).append(d)
203        return d
204
205
206@frozen
207class _WormholeServerView(object):
208    """
209    Present an interface onto the server to be consumed by individual
210    wormholes.
211    """
212    _server: MemoryWormholeServer
213    _key: ApplicationKey
214
215    def allocate_code(self, wormhole: _MemoryWormhole, code: Optional[WormholeCode]) -> WormholeCode:
216        """
217        Allocate a new code for the given wormhole in the scope associated with
218        this view.
219        """
220        app = self._server._apps.setdefault(self._key, _WormholeApp())
221        return app.allocate_code(wormhole, code)
222
223    def wormhole_by_code(self, code: WormholeCode, exclude: object) -> Deferred[IWormhole]:
224        """
225        Retrieve all wormholes previously associated with a code.
226        """
227        app = self._server._apps[self._key]
228        wormholes = app.wormholes[code]
229        try:
230            [wormhole] = list(wormhole for wormhole in wormholes if wormhole != exclude)
231        except ValueError:
232            return app.wait_for_wormhole(code)
233        return succeed(wormhole)
234
235
236@implementer(IWormhole)
237@define
238class _MemoryWormhole(object):
239    """
240    Represent one side of a wormhole as conceived by ``MemoryWormholeServer``.
241    """
242
243    _view: _WormholeServerView
244    _code: Optional[WormholeCode] = None
245    _payload: DeferredQueue[WormholeMessage] = field(default=Factory(DeferredQueue))
246    _waiting_for_code: list[Deferred[WormholeCode]] = field(default=Factory(list))
247
248    def allocate_code(self) -> None:
249        if self._code is not None:
250            raise ValueError(
251                "allocate_code used with a wormhole which already has a code"
252            )
253        self._code = self._view.allocate_code(self, None)
254        waiters = self._waiting_for_code
255        self._waiting_for_code = []
256        for d in waiters:
257            d.callback(self._code)
258
259    def set_code(self, code: WormholeCode) -> None:
260        if self._code is None:
261            self._code = code
262            self._view.allocate_code(self, code)
263        else:
264            raise ValueError("set_code used with a wormhole which already has a code")
265
266    def when_code(self) -> Deferred[WormholeCode]:
267        if self._code is None:
268            d : Deferred[WormholeCode] = Deferred()
269            self._waiting_for_code.append(d)
270            return d
271        return succeed(self._code)
272
273    def get_welcome(self) -> Deferred[str]:
274        return succeed("welcome")
275
276    def send_message(self, payload: WormholeMessage) -> None:
277        self._payload.put(payload)
278
279    def when_received(self) -> Deferred[WormholeMessage]:
280        if self._code is None:
281            raise ValueError(
282                "This implementation requires set_code or allocate_code "
283                "before when_received."
284            )
285        d = self._view.wormhole_by_code(self._code, exclude=self)
286
287        def got_wormhole(wormhole: _MemoryWormhole) -> Deferred[WormholeMessage]:
288            msg: Deferred[WormholeMessage] = wormhole._payload.get()
289            return msg
290
291        d.addCallback(got_wormhole)
292        return d
293
294    get_message = when_received
295
296    def close(self) -> None:
297        pass
298
299    # 0.9.2 compatibility
300    def get_code(self) -> Deferred[WormholeCode]:
301        if self._code is None:
302            self.allocate_code()
303        return self.when_code()
304
305    get = when_received
306
307
308def memory_server() -> tuple[MemoryWormholeServer, TestingHelper]:
309    """
310    Create a paired in-memory wormhole server and testing helper.
311    """
312    server = MemoryWormholeServer()
313    return server, TestingHelper(server)
Note: See TracBrowser for help on using the repository browser.