source: trunk/src/allmydata/test/test_util.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 24.4 KB
Line 
1"""
2Ported to Python3.
3"""
4
5import os, time, sys
6import yaml
7import json
8from threading import current_thread
9
10from twisted.trial import unittest
11from foolscap.api import Violation, RemoteException
12
13from allmydata.util import idlib, mathutil
14from allmydata.util import fileutil
15from allmydata.util import jsonbytes
16from allmydata.util import pollmixin
17from allmydata.util import yamlutil
18from allmydata.util import rrefutil
19from allmydata.util.fileutil import EncryptedTemporaryFile
20from allmydata.util.cputhreadpool import defer_to_thread, disable_thread_pool_for_test
21from allmydata.test.common_util import ReallyEqualMixin
22from .no_network import fireNow, LocalWrapper
23
24long = int
25
26
27class IDLib(unittest.TestCase):
28    def test_nodeid_b2a(self):
29        result = idlib.nodeid_b2a(b"\x00"*20)
30        self.assertEqual(result, "a"*32)
31        self.assertIsInstance(result, str)
32
33
34class MyList(list):
35    pass
36
37class Math(unittest.TestCase):
38    def test_round_sigfigs(self):
39        f = mathutil.round_sigfigs
40        self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
41
42
43class FileUtil(ReallyEqualMixin, unittest.TestCase):
44    def mkdir(self, basedir, path, mode=0o777):
45        fn = os.path.join(basedir, path)
46        fileutil.make_dirs(fn, mode)
47
48    def touch(self, basedir, path, mode=None, data="touch\n"):
49        fn = os.path.join(basedir, path)
50        f = open(fn, "w")
51        f.write(data)
52        f.close()
53        if mode is not None:
54            os.chmod(fn, mode)
55
56    def test_rm_dir(self):
57        basedir = "util/FileUtil/test_rm_dir"
58        fileutil.make_dirs(basedir)
59        # create it again to test idempotency
60        fileutil.make_dirs(basedir)
61        d = os.path.join(basedir, "doomed")
62        self.mkdir(d, "a/b")
63        self.touch(d, "a/b/1.txt")
64        self.touch(d, "a/b/2.txt", 0o444)
65        self.touch(d, "a/b/3.txt", 0)
66        self.mkdir(d, "a/c")
67        self.touch(d, "a/c/1.txt")
68        self.touch(d, "a/c/2.txt", 0o444)
69        self.touch(d, "a/c/3.txt", 0)
70        os.chmod(os.path.join(d, "a/c"), 0o444)
71        self.mkdir(d, "a/d")
72        self.touch(d, "a/d/1.txt")
73        self.touch(d, "a/d/2.txt", 0o444)
74        self.touch(d, "a/d/3.txt", 0)
75        os.chmod(os.path.join(d, "a/d"), 0)
76
77        fileutil.rm_dir(d)
78        self.failIf(os.path.exists(d))
79        # remove it again to test idempotency
80        fileutil.rm_dir(d)
81
82    def test_remove_if_possible(self):
83        basedir = "util/FileUtil/test_remove_if_possible"
84        fileutil.make_dirs(basedir)
85        self.touch(basedir, "here")
86        fn = os.path.join(basedir, "here")
87        fileutil.remove_if_possible(fn)
88        self.failIf(os.path.exists(fn))
89        fileutil.remove_if_possible(fn) # should be idempotent
90        fileutil.rm_dir(basedir)
91        fileutil.remove_if_possible(fn) # should survive errors
92
93    def test_write_atomically(self):
94        basedir = "util/FileUtil/test_write_atomically"
95        fileutil.make_dirs(basedir)
96        fn = os.path.join(basedir, "here")
97        fileutil.write_atomically(fn, b"one", "b")
98        self.failUnlessEqual(fileutil.read(fn), b"one")
99        fileutil.write_atomically(fn, u"two", mode="") # non-binary
100        self.failUnlessEqual(fileutil.read(fn), b"two")
101
102    def test_rename(self):
103        basedir = "util/FileUtil/test_rename"
104        fileutil.make_dirs(basedir)
105        self.touch(basedir, "here")
106        fn = os.path.join(basedir, "here")
107        fn2 = os.path.join(basedir, "there")
108        fileutil.rename(fn, fn2)
109        self.failIf(os.path.exists(fn))
110        self.failUnless(os.path.exists(fn2))
111
112    def test_rename_no_overwrite(self):
113        workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
114        fileutil.make_dirs(workdir)
115
116        source_path = os.path.join(workdir, "source")
117        dest_path   = os.path.join(workdir, "dest")
118
119        # when neither file exists
120        self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
121
122        # when only dest exists
123        fileutil.write(dest_path,   b"dest")
124        self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
125        self.failUnlessEqual(fileutil.read(dest_path),   b"dest")
126
127        # when both exist
128        fileutil.write(source_path, b"source")
129        self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
130        self.failUnlessEqual(fileutil.read(source_path), b"source")
131        self.failUnlessEqual(fileutil.read(dest_path),   b"dest")
132
133        # when only source exists
134        os.remove(dest_path)
135        fileutil.rename_no_overwrite(source_path, dest_path)
136        self.failUnlessEqual(fileutil.read(dest_path), b"source")
137        self.failIf(os.path.exists(source_path))
138
139    def test_replace_file(self):
140        workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
141        fileutil.make_dirs(workdir)
142
143        replaced_path    = os.path.join(workdir, "replaced")
144        replacement_path = os.path.join(workdir, "replacement")
145
146        # when none of the files exist
147        self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path)
148
149        # when only replaced exists
150        fileutil.write(replaced_path,   b"foo")
151        self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path)
152        self.failUnlessEqual(fileutil.read(replaced_path), b"foo")
153
154        # when both replaced and replacement exist
155        fileutil.write(replacement_path, b"bar")
156        fileutil.replace_file(replaced_path, replacement_path)
157        self.failUnlessEqual(fileutil.read(replaced_path), b"bar")
158        self.failIf(os.path.exists(replacement_path))
159
160        # when only replacement exists
161        os.remove(replaced_path)
162        fileutil.write(replacement_path, b"bar")
163        fileutil.replace_file(replaced_path, replacement_path)
164        self.failUnlessEqual(fileutil.read(replaced_path), b"bar")
165        self.failIf(os.path.exists(replacement_path))
166
167    def test_du(self):
168        basedir = "util/FileUtil/test_du"
169        fileutil.make_dirs(basedir)
170        d = os.path.join(basedir, "space-consuming")
171        self.mkdir(d, "a/b")
172        self.touch(d, "a/b/1.txt", data="a"*10)
173        self.touch(d, "a/b/2.txt", data="b"*11)
174        self.mkdir(d, "a/c")
175        self.touch(d, "a/c/1.txt", data="c"*12)
176        self.touch(d, "a/c/2.txt", data="d"*13)
177
178        used = fileutil.du(basedir)
179        self.failUnlessEqual(10+11+12+13, used)
180
181    def test_abspath_expanduser_unicode(self):
182        self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, b"bytestring")
183
184        saved_cwd = os.path.normpath(os.getcwd())
185        abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
186        abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
187        self.failUnless(isinstance(saved_cwd, str), saved_cwd)
188        self.failUnless(isinstance(abspath_cwd, str), abspath_cwd)
189        if sys.platform == "win32":
190            self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
191        else:
192            self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
193        self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
194
195        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
196        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
197        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
198        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
199        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
200
201        # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
202
203        foo = fileutil.abspath_expanduser_unicode(u"foo")
204        self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
205
206        foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
207        self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
208
209        if sys.platform == "win32":
210            # This is checking that a drive letter is added for a path without one.
211            baz = fileutil.abspath_expanduser_unicode(u"\\baz")
212            self.failUnless(baz.startswith(u"\\\\?\\"), baz)
213            self.failUnlessReallyEqual(baz[5 :], u":\\baz")
214
215            bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
216            self.failUnless(bar.startswith(u"\\\\?\\"), bar)
217            self.failUnlessReallyEqual(bar[5 :], u":\\bar")
218            # not u":\\baz\\bar", because \bar is absolute on the current drive.
219
220            self.failUnlessReallyEqual(baz[4], bar[4])  # same drive
221
222            baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
223            self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
224            self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
225
226            bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
227            self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
228            self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
229            # not u":\\baz\\bar", because \bar is absolute on the current drive.
230
231            self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0])  # same drive
232
233        self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
234        self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
235
236        cwds = ['cwd']
237        try:
238            cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
239                                            or 'ascii'))
240        except UnicodeEncodeError:
241            pass # the cwd can't be encoded -- test with ascii cwd only
242
243        for cwd in cwds:
244            try:
245                os.mkdir(cwd)
246                os.chdir(cwd)
247                for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
248                    uabspath = fileutil.abspath_expanduser_unicode(upath)
249                    self.failUnless(isinstance(uabspath, str), uabspath)
250
251                    uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
252                    self.failUnless(isinstance(uabspath_notlong, str), uabspath_notlong)
253            finally:
254                os.chdir(saved_cwd)
255
256    def test_make_dirs_with_absolute_mode(self):
257        if sys.platform == 'win32':
258            raise unittest.SkipTest("Permissions don't work the same on windows.")
259
260        workdir = fileutil.abspath_expanduser_unicode(u"test_make_dirs_with_absolute_mode")
261        fileutil.make_dirs(workdir)
262        abspath = fileutil.abspath_expanduser_unicode(u"a/b/c/d", base=workdir)
263        fileutil.make_dirs_with_absolute_mode(workdir, abspath, 0o766)
264        new_mode = os.stat(os.path.join(workdir, "a", "b", "c", "d")).st_mode & 0o777
265        self.failUnlessEqual(new_mode, 0o766)
266        new_mode = os.stat(os.path.join(workdir, "a", "b", "c")).st_mode & 0o777
267        self.failUnlessEqual(new_mode, 0o766)
268        new_mode = os.stat(os.path.join(workdir, "a", "b")).st_mode & 0o777
269        self.failUnlessEqual(new_mode, 0o766)
270        new_mode = os.stat(os.path.join(workdir, "a")).st_mode & 0o777
271        self.failUnlessEqual(new_mode, 0o766)
272        new_mode = os.stat(workdir).st_mode & 0o777
273        self.failIfEqual(new_mode, 0o766)
274
275    def test_create_long_path(self):
276        """
277        Even for paths with total length greater than 260 bytes,
278        ``fileutil.abspath_expanduser_unicode`` produces a path on which other
279        path-related APIs can operate.
280
281        https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
282        documents certain Windows-specific path length limitations this test
283        is specifically intended to demonstrate can be overcome.
284        """
285        workdir = u"test_create_long_path"
286        fileutil.make_dirs(workdir)
287        base_path = fileutil.abspath_expanduser_unicode(workdir)
288        base_length = len(base_path)
289
290        # Construct a path /just/ long enough to exercise the important case.
291        # It would be nice if we could just use a seemingly globally valid
292        # long file name (the `x...` portion) here - for example, a name 255
293        # bytes long- and a previous version of this test did just that.
294        # However, aufs imposes a 242 byte length limit on file names.  Most
295        # other POSIX filesystems do allow names up to 255 bytes.  It's not
296        # clear there's anything we can *do* about lower limits, though, and
297        # POSIX.1-2017 (and earlier) only requires that the maximum be at
298        # least 14 (!!!)  bytes.
299        long_path = os.path.join(base_path, u'x' * (261 - base_length))
300
301        def _cleanup():
302            fileutil.remove(long_path)
303        self.addCleanup(_cleanup)
304
305        fileutil.write(long_path, b"test")
306        self.failUnless(os.path.exists(long_path))
307        self.failUnlessEqual(fileutil.read(long_path), b"test")
308        _cleanup()
309        self.failIf(os.path.exists(long_path))
310
311    def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
312        def call_windows_getenv(name):
313            if name == u"USERPROFILE": return userprofile
314            if name == u"HOMEDRIVE":   return homedrive
315            if name == u"HOMEPATH":    return homepath
316            self.fail("unexpected argument to call_windows_getenv")
317        self.patch(fileutil, 'windows_getenv', call_windows_getenv)
318
319        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
320        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
321        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
322        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
323        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
324        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
325
326    def test_windows_expanduser_xp(self):
327        return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
328
329    def test_windows_expanduser_win7(self):
330        return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
331
332    def test_disk_stats(self):
333        avail = fileutil.get_available_space('.', 2**14)
334        if avail == 0:
335            raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
336
337        disk = fileutil.get_disk_stats('.', 2**13)
338        self.failUnless(disk['total'] > 0, disk['total'])
339        # we tolerate used==0 for a Travis-CI bug, see #2290
340        self.failUnless(disk['used'] >= 0, disk['used'])
341        self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
342        self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
343        self.failUnless(disk['avail'] > 0, disk['avail'])
344
345    def test_disk_stats_avail_nonnegative(self):
346        # This test will spuriously fail if you have more than 2^128
347        # bytes of available space on your filesystem.
348        disk = fileutil.get_disk_stats('.', 2**128)
349        self.failUnlessEqual(disk['avail'], 0)
350
351    def test_get_pathinfo(self):
352        basedir = "util/FileUtil/test_get_pathinfo"
353        fileutil.make_dirs(basedir)
354
355        # create a directory
356        self.mkdir(basedir, "a")
357        dirinfo = fileutil.get_pathinfo(basedir)
358        self.failUnlessTrue(dirinfo.isdir)
359        self.failUnlessTrue(dirinfo.exists)
360        self.failUnlessFalse(dirinfo.isfile)
361        self.failUnlessFalse(dirinfo.islink)
362
363        # create a file
364        f = os.path.join(basedir, "1.txt")
365        fileutil.write(f, b"a"*10)
366        fileinfo = fileutil.get_pathinfo(f)
367        self.failUnlessTrue(fileinfo.isfile)
368        self.failUnlessTrue(fileinfo.exists)
369        self.failUnlessFalse(fileinfo.isdir)
370        self.failUnlessFalse(fileinfo.islink)
371        self.failUnlessEqual(fileinfo.size, 10)
372
373        # path at which nothing exists
374        dnename = os.path.join(basedir, "doesnotexist")
375        now_ns = fileutil.seconds_to_ns(time.time())
376        dneinfo = fileutil.get_pathinfo(dnename, now_ns=now_ns)
377        self.failUnlessFalse(dneinfo.exists)
378        self.failUnlessFalse(dneinfo.isfile)
379        self.failUnlessFalse(dneinfo.isdir)
380        self.failUnlessFalse(dneinfo.islink)
381        self.failUnlessEqual(dneinfo.size, None)
382        self.failUnlessEqual(dneinfo.mtime_ns, now_ns)
383        self.failUnlessEqual(dneinfo.ctime_ns, now_ns)
384
385    def test_get_pathinfo_symlink(self):
386        if not hasattr(os, 'symlink'):
387            raise unittest.SkipTest("can't create symlinks on this platform")
388
389        basedir = "util/FileUtil/test_get_pathinfo"
390        fileutil.make_dirs(basedir)
391
392        f = os.path.join(basedir, "1.txt")
393        fileutil.write(f, b"a"*10)
394
395        # create a symlink pointing to 1.txt
396        slname = os.path.join(basedir, "linkto1.txt")
397        os.symlink(f, slname)
398        symlinkinfo = fileutil.get_pathinfo(slname)
399        self.failUnlessTrue(symlinkinfo.islink)
400        self.failUnlessTrue(symlinkinfo.exists)
401        self.failUnlessFalse(symlinkinfo.isfile)
402        self.failUnlessFalse(symlinkinfo.isdir)
403
404    def test_encrypted_tempfile(self):
405        f = EncryptedTemporaryFile()
406        f.write(b"foobar")
407        f.close()
408
409    def test_write(self):
410        """fileutil.write() can write both unicode and bytes."""
411        path = self.mktemp()
412        fileutil.write(path, b"abc")
413        with open(path, "rb") as f:
414            self.assertEqual(f.read(), b"abc")
415        fileutil.write(path, u"def \u1234")
416        with open(path, "rb") as f:
417            self.assertEqual(f.read(), u"def \u1234".encode("utf-8"))
418
419
420class PollMixinTests(unittest.TestCase):
421    def setUp(self):
422        self.pm = pollmixin.PollMixin()
423
424    def test_PollMixin_True(self):
425        d = self.pm.poll(check_f=lambda : True,
426                         pollinterval=0.1)
427        return d
428
429    def test_PollMixin_False_then_True(self):
430        i = iter([False, True])
431        d = self.pm.poll(check_f=lambda: next(i),
432                         pollinterval=0.1)
433        return d
434
435    def test_timeout(self):
436        d = self.pm.poll(check_f=lambda: False,
437                         pollinterval=0.01,
438                         timeout=1)
439        def _suc(res):
440            self.fail("poll should have failed, not returned %s" % (res,))
441        def _err(f):
442            f.trap(pollmixin.TimeoutError)
443            return None # success
444        d.addCallbacks(_suc, _err)
445        return d
446
447
448ctr = [0]
449class EqButNotIs(object):
450    def __init__(self, x):
451        self.x = x
452        self.hash = ctr[0]
453        ctr[0] += 1
454    def __repr__(self):
455        return "<%s %s>" % (self.__class__.__name__, self.x,)
456    def __hash__(self):
457        return self.hash
458    def __le__(self, other):
459        return self.x <= other
460    def __lt__(self, other):
461        return self.x < other
462    def __ge__(self, other):
463        return self.x >= other
464    def __gt__(self, other):
465        return self.x > other
466    def __ne__(self, other):
467        return self.x != other
468    def __eq__(self, other):
469        return self.x == other
470
471
472class YAML(unittest.TestCase):
473    def test_convert(self):
474        """
475        Unicode and (ASCII) native strings get roundtripped to Unicode strings.
476        """
477        data = yaml.safe_dump(
478            ["str", "unicode", "\u1234nicode"]
479        )
480        back = yamlutil.safe_load(data)
481        self.assertIsInstance(back[0], str)
482        self.assertIsInstance(back[1], str)
483        self.assertIsInstance(back[2], str)
484
485
486class JSONBytes(unittest.TestCase):
487    """Tests for jsonbytes module."""
488
489    def test_encode_bytes(self):
490        """jsonbytes.dumps() encodes bytes.
491
492        Bytes are presumed to be UTF-8 encoded.
493        """
494        snowman = u"def\N{SNOWMAN}\uFF00"
495        data = {
496            b"hello": [1, b"cd", {b"abc": [123, snowman.encode("utf-8")]}],
497        }
498        expected = {
499            u"hello": [1, u"cd", {u"abc": [123, snowman]}],
500        }
501        # Bytes get passed through as if they were UTF-8 Unicode:
502        encoded = jsonbytes.dumps(data)
503        self.assertEqual(json.loads(encoded), expected)
504        self.assertEqual(jsonbytes.loads(encoded), expected)
505
506    def test_encode_unicode(self):
507        """jsonbytes.dumps() encodes Unicode string as usual."""
508        expected = {
509            u"hello": [1, u"cd"],
510        }
511        encoded = jsonbytes.dumps(expected)
512        self.assertEqual(json.loads(encoded), expected)
513
514    def test_dumps_bytes(self):
515        """jsonbytes.dumps_bytes always returns bytes."""
516        x = {u"def\N{SNOWMAN}\uFF00": 123}
517        encoded = jsonbytes.dumps_bytes(x)
518        self.assertIsInstance(encoded, bytes)
519        self.assertEqual(json.loads(encoded), x)
520
521    def test_any_bytes_unsupported_by_default(self):
522        """By default non-UTF-8 bytes raise error."""
523        bytestring = b"abc\xff\x00"
524        with self.assertRaises(UnicodeDecodeError):
525            jsonbytes.dumps(bytestring)
526        with self.assertRaises(UnicodeDecodeError):
527            jsonbytes.dumps_bytes(bytestring)
528        with self.assertRaises(UnicodeDecodeError):
529            json.dumps(bytestring, cls=jsonbytes.UTF8BytesJSONEncoder)
530
531    def test_any_bytes(self):
532        """If any_bytes is True, non-UTF-8 bytes don't break encoding."""
533        bytestring = b"abc\xff\xff123"
534        o = {bytestring: bytestring}
535        expected = {"abc\\xff\\xff123": "abc\\xff\\xff123"}
536        self.assertEqual(
537            json.loads(jsonbytes.dumps(o, any_bytes=True)),
538            expected,
539        )
540        self.assertEqual(
541            json.loads(json.dumps(
542                o, cls=jsonbytes.AnyBytesJSONEncoder)),
543            expected,
544        )
545        self.assertEqual(
546            json.loads(jsonbytes.dumps(o, any_bytes=True)),
547            expected
548        )
549
550    def test_dumps_bytes_unicode_separators(self):
551        """Unicode separators don't prevent the result from being bytes."""
552        result = jsonbytes.dumps_bytes([1, 2], separators=(u',', u':'))
553        self.assertIsInstance(result, bytes)
554        self.assertEqual(result, b"[1,2]")
555
556
557
558class FakeGetVersion(object):
559    """Emulate an object with a get_version."""
560
561    def __init__(self, result):
562        self.result = result
563
564    def remote_get_version(self):
565        if isinstance(self.result, Exception):
566            raise self.result
567        return self.result
568
569
570class RrefUtilTests(unittest.TestCase):
571    """Tests for rrefutil."""
572
573    def test_version_returned(self):
574        """If get_version() succeeded, it is set on the rref."""
575        rref = LocalWrapper(FakeGetVersion(12345), fireNow)
576        result = self.successResultOf(
577            rrefutil.add_version_to_remote_reference(rref, "default")
578        )
579        self.assertEqual(result.version, 12345)
580        self.assertIdentical(result, rref)
581
582    def test_exceptions(self):
583        """If get_version() failed, default version is set on the rref."""
584        for exception in (Violation(), RemoteException(ValueError())):
585            rref = LocalWrapper(FakeGetVersion(exception), fireNow)
586            result = self.successResultOf(
587                rrefutil.add_version_to_remote_reference(rref, "Default")
588            )
589            self.assertEqual(result.version, "Default")
590            self.assertIdentical(result, rref)
591
592
593class CPUThreadPool(unittest.TestCase):
594    """Tests for cputhreadpool."""
595
596    async def test_runs_in_thread(self):
597        """The given function runs in a thread."""
598        def f(*args, **kwargs):
599            return current_thread(), args, kwargs
600
601        this_thread = current_thread().ident
602        thread, args, kwargs = await defer_to_thread(f, 1, 3, key=4, value=5)
603
604        # The task ran in a different thread:
605        self.assertNotEqual(thread.ident, this_thread)
606        self.assertEqual(args, (1, 3))
607        self.assertEqual(kwargs, {"key": 4, "value": 5})
608
609    async def test_when_disabled_runs_in_same_thread(self):
610        """
611        If the CPU thread pool is disabled, the given function runs in the
612        current thread.
613        """
614        disable_thread_pool_for_test(self)
615        def f(*args, **kwargs):
616            return current_thread().ident, args, kwargs
617
618        this_thread = current_thread().ident
619        thread, args, kwargs = await defer_to_thread(f, 1, 3, key=4, value=5)
620
621        self.assertEqual(thread, this_thread)
622        self.assertEqual(args, (1, 3))
623        self.assertEqual(kwargs, {"key": 4, "value": 5})
Note: See TracBrowser for help on using the repository browser.