1 | """ |
2 | Ported to Python3. |
3 | """ |
4 | |
5 | import os, time, sys |
6 | import yaml |
7 | import json |
8 | from threading import current_thread |
9 | |
10 | from twisted.trial import unittest |
11 | from foolscap.api import Violation, RemoteException |
12 | |
13 | from allmydata.util import idlib, mathutil |
14 | from allmydata.util import fileutil |
15 | from allmydata.util import jsonbytes |
16 | from allmydata.util import pollmixin |
17 | from allmydata.util import yamlutil |
18 | from allmydata.util import rrefutil |
19 | from allmydata.util.fileutil import EncryptedTemporaryFile |
20 | from allmydata.util.cputhreadpool import defer_to_thread, disable_thread_pool_for_test |
21 | from allmydata.test.common_util import ReallyEqualMixin |
22 | from .no_network import fireNow, LocalWrapper |
23 | |
24 | long = int |
25 | |
26 | |
27 | class IDLib(unittest.TestCase): |
28 | def test_nodeid_b2a(self): |
29 | result = idlib.nodeid_b2a(b"\x00"*20) |
30 | self.assertEqual(result, "a"*32) |
31 | self.assertIsInstance(result, str) |
32 | |
33 | |
34 | class MyList(list): |
35 | pass |
36 | |
37 | class Math(unittest.TestCase): |
38 | def test_round_sigfigs(self): |
39 | f = mathutil.round_sigfigs |
40 | self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002) |
41 | |
42 | |
43 | class FileUtil(ReallyEqualMixin, unittest.TestCase): |
44 | def mkdir(self, basedir, path, mode=0o777): |
45 | fn = os.path.join(basedir, path) |
46 | fileutil.make_dirs(fn, mode) |
47 | |
48 | def touch(self, basedir, path, mode=None, data="touch\n"): |
49 | fn = os.path.join(basedir, path) |
50 | f = open(fn, "w") |
51 | f.write(data) |
52 | f.close() |
53 | if mode is not None: |
54 | os.chmod(fn, mode) |
55 | |
56 | def test_rm_dir(self): |
57 | basedir = "util/FileUtil/test_rm_dir" |
58 | fileutil.make_dirs(basedir) |
59 | # create it again to test idempotency |
60 | fileutil.make_dirs(basedir) |
61 | d = os.path.join(basedir, "doomed") |
62 | self.mkdir(d, "a/b") |
63 | self.touch(d, "a/b/1.txt") |
64 | self.touch(d, "a/b/2.txt", 0o444) |
65 | self.touch(d, "a/b/3.txt", 0) |
66 | self.mkdir(d, "a/c") |
67 | self.touch(d, "a/c/1.txt") |
68 | self.touch(d, "a/c/2.txt", 0o444) |
69 | self.touch(d, "a/c/3.txt", 0) |
70 | os.chmod(os.path.join(d, "a/c"), 0o444) |
71 | self.mkdir(d, "a/d") |
72 | self.touch(d, "a/d/1.txt") |
73 | self.touch(d, "a/d/2.txt", 0o444) |
74 | self.touch(d, "a/d/3.txt", 0) |
75 | os.chmod(os.path.join(d, "a/d"), 0) |
76 | |
77 | fileutil.rm_dir(d) |
78 | self.failIf(os.path.exists(d)) |
79 | # remove it again to test idempotency |
80 | fileutil.rm_dir(d) |
81 | |
82 | def test_remove_if_possible(self): |
83 | basedir = "util/FileUtil/test_remove_if_possible" |
84 | fileutil.make_dirs(basedir) |
85 | self.touch(basedir, "here") |
86 | fn = os.path.join(basedir, "here") |
87 | fileutil.remove_if_possible(fn) |
88 | self.failIf(os.path.exists(fn)) |
89 | fileutil.remove_if_possible(fn) # should be idempotent |
90 | fileutil.rm_dir(basedir) |
91 | fileutil.remove_if_possible(fn) # should survive errors |
92 | |
93 | def test_write_atomically(self): |
94 | basedir = "util/FileUtil/test_write_atomically" |
95 | fileutil.make_dirs(basedir) |
96 | fn = os.path.join(basedir, "here") |
97 | fileutil.write_atomically(fn, b"one", "b") |
98 | self.failUnlessEqual(fileutil.read(fn), b"one") |
99 | fileutil.write_atomically(fn, u"two", mode="") # non-binary |
100 | self.failUnlessEqual(fileutil.read(fn), b"two") |
101 | |
102 | def test_rename(self): |
103 | basedir = "util/FileUtil/test_rename" |
104 | fileutil.make_dirs(basedir) |
105 | self.touch(basedir, "here") |
106 | fn = os.path.join(basedir, "here") |
107 | fn2 = os.path.join(basedir, "there") |
108 | fileutil.rename(fn, fn2) |
109 | self.failIf(os.path.exists(fn)) |
110 | self.failUnless(os.path.exists(fn2)) |
111 | |
112 | def test_rename_no_overwrite(self): |
113 | workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite") |
114 | fileutil.make_dirs(workdir) |
115 | |
116 | source_path = os.path.join(workdir, "source") |
117 | dest_path = os.path.join(workdir, "dest") |
118 | |
119 | # when neither file exists |
120 | self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) |
121 | |
122 | # when only dest exists |
123 | fileutil.write(dest_path, b"dest") |
124 | self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) |
125 | self.failUnlessEqual(fileutil.read(dest_path), b"dest") |
126 | |
127 | # when both exist |
128 | fileutil.write(source_path, b"source") |
129 | self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) |
130 | self.failUnlessEqual(fileutil.read(source_path), b"source") |
131 | self.failUnlessEqual(fileutil.read(dest_path), b"dest") |
132 | |
133 | # when only source exists |
134 | os.remove(dest_path) |
135 | fileutil.rename_no_overwrite(source_path, dest_path) |
136 | self.failUnlessEqual(fileutil.read(dest_path), b"source") |
137 | self.failIf(os.path.exists(source_path)) |
138 | |
139 | def test_replace_file(self): |
140 | workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file") |
141 | fileutil.make_dirs(workdir) |
142 | |
143 | replaced_path = os.path.join(workdir, "replaced") |
144 | replacement_path = os.path.join(workdir, "replacement") |
145 | |
146 | # when none of the files exist |
147 | self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path) |
148 | |
149 | # when only replaced exists |
150 | fileutil.write(replaced_path, b"foo") |
151 | self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path) |
152 | self.failUnlessEqual(fileutil.read(replaced_path), b"foo") |
153 | |
154 | # when both replaced and replacement exist |
155 | fileutil.write(replacement_path, b"bar") |
156 | fileutil.replace_file(replaced_path, replacement_path) |
157 | self.failUnlessEqual(fileutil.read(replaced_path), b"bar") |
158 | self.failIf(os.path.exists(replacement_path)) |
159 | |
160 | # when only replacement exists |
161 | os.remove(replaced_path) |
162 | fileutil.write(replacement_path, b"bar") |
163 | fileutil.replace_file(replaced_path, replacement_path) |
164 | self.failUnlessEqual(fileutil.read(replaced_path), b"bar") |
165 | self.failIf(os.path.exists(replacement_path)) |
166 | |
167 | def test_du(self): |
168 | basedir = "util/FileUtil/test_du" |
169 | fileutil.make_dirs(basedir) |
170 | d = os.path.join(basedir, "space-consuming") |
171 | self.mkdir(d, "a/b") |
172 | self.touch(d, "a/b/1.txt", data="a"*10) |
173 | self.touch(d, "a/b/2.txt", data="b"*11) |
174 | self.mkdir(d, "a/c") |
175 | self.touch(d, "a/c/1.txt", data="c"*12) |
176 | self.touch(d, "a/c/2.txt", data="d"*13) |
177 | |
178 | used = fileutil.du(basedir) |
179 | self.failUnlessEqual(10+11+12+13, used) |
180 | |
181 | def test_abspath_expanduser_unicode(self): |
182 | self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, b"bytestring") |
183 | |
184 | saved_cwd = os.path.normpath(os.getcwd()) |
185 | abspath_cwd = fileutil.abspath_expanduser_unicode(u".") |
186 | abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False) |
187 | self.failUnless(isinstance(saved_cwd, str), saved_cwd) |
188 | self.failUnless(isinstance(abspath_cwd, str), abspath_cwd) |
189 | if sys.platform == "win32": |
190 | self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd)) |
191 | else: |
192 | self.failUnlessReallyEqual(abspath_cwd, saved_cwd) |
193 | self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd) |
194 | |
195 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo") |
196 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo") |
197 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo") |
198 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo") |
199 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar") |
200 | |
201 | # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath> |
202 | |
203 | foo = fileutil.abspath_expanduser_unicode(u"foo") |
204 | self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo) |
205 | |
206 | foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo) |
207 | self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar) |
208 | |
209 | if sys.platform == "win32": |
210 | # This is checking that a drive letter is added for a path without one. |
211 | baz = fileutil.abspath_expanduser_unicode(u"\\baz") |
212 | self.failUnless(baz.startswith(u"\\\\?\\"), baz) |
213 | self.failUnlessReallyEqual(baz[5 :], u":\\baz") |
214 | |
215 | bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz) |
216 | self.failUnless(bar.startswith(u"\\\\?\\"), bar) |
217 | self.failUnlessReallyEqual(bar[5 :], u":\\bar") |
218 | # not u":\\baz\\bar", because \bar is absolute on the current drive. |
219 | |
220 | self.failUnlessReallyEqual(baz[4], bar[4]) # same drive |
221 | |
222 | baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False) |
223 | self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong) |
224 | self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz") |
225 | |
226 | bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False) |
227 | self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong) |
228 | self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar") |
229 | # not u":\\baz\\bar", because \bar is absolute on the current drive. |
230 | |
231 | self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive |
232 | |
233 | self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~")) |
234 | self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False)) |
235 | |
236 | cwds = ['cwd'] |
237 | try: |
238 | cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding() |
239 | or 'ascii')) |
240 | except UnicodeEncodeError: |
241 | pass # the cwd can't be encoded -- test with ascii cwd only |
242 | |
243 | for cwd in cwds: |
244 | try: |
245 | os.mkdir(cwd) |
246 | os.chdir(cwd) |
247 | for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'): |
248 | uabspath = fileutil.abspath_expanduser_unicode(upath) |
249 | self.failUnless(isinstance(uabspath, str), uabspath) |
250 | |
251 | uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False) |
252 | self.failUnless(isinstance(uabspath_notlong, str), uabspath_notlong) |
253 | finally: |
254 | os.chdir(saved_cwd) |
255 | |
256 | def test_make_dirs_with_absolute_mode(self): |
257 | if sys.platform == 'win32': |
258 | raise unittest.SkipTest("Permissions don't work the same on windows.") |
259 | |
260 | workdir = fileutil.abspath_expanduser_unicode(u"test_make_dirs_with_absolute_mode") |
261 | fileutil.make_dirs(workdir) |
262 | abspath = fileutil.abspath_expanduser_unicode(u"a/b/c/d", base=workdir) |
263 | fileutil.make_dirs_with_absolute_mode(workdir, abspath, 0o766) |
264 | new_mode = os.stat(os.path.join(workdir, "a", "b", "c", "d")).st_mode & 0o777 |
265 | self.failUnlessEqual(new_mode, 0o766) |
266 | new_mode = os.stat(os.path.join(workdir, "a", "b", "c")).st_mode & 0o777 |
267 | self.failUnlessEqual(new_mode, 0o766) |
268 | new_mode = os.stat(os.path.join(workdir, "a", "b")).st_mode & 0o777 |
269 | self.failUnlessEqual(new_mode, 0o766) |
270 | new_mode = os.stat(os.path.join(workdir, "a")).st_mode & 0o777 |
271 | self.failUnlessEqual(new_mode, 0o766) |
272 | new_mode = os.stat(workdir).st_mode & 0o777 |
273 | self.failIfEqual(new_mode, 0o766) |
274 | |
275 | def test_create_long_path(self): |
276 | """ |
277 | Even for paths with total length greater than 260 bytes, |
278 | ``fileutil.abspath_expanduser_unicode`` produces a path on which other |
279 | path-related APIs can operate. |
280 | |
281 | https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx |
282 | documents certain Windows-specific path length limitations this test |
283 | is specifically intended to demonstrate can be overcome. |
284 | """ |
285 | workdir = u"test_create_long_path" |
286 | fileutil.make_dirs(workdir) |
287 | base_path = fileutil.abspath_expanduser_unicode(workdir) |
288 | base_length = len(base_path) |
289 | |
290 | # Construct a path /just/ long enough to exercise the important case. |
291 | # It would be nice if we could just use a seemingly globally valid |
292 | # long file name (the `x...` portion) here - for example, a name 255 |
293 | # bytes long- and a previous version of this test did just that. |
294 | # However, aufs imposes a 242 byte length limit on file names. Most |
295 | # other POSIX filesystems do allow names up to 255 bytes. It's not |
296 | # clear there's anything we can *do* about lower limits, though, and |
297 | # POSIX.1-2017 (and earlier) only requires that the maximum be at |
298 | # least 14 (!!!) bytes. |
299 | long_path = os.path.join(base_path, u'x' * (261 - base_length)) |
300 | |
301 | def _cleanup(): |
302 | fileutil.remove(long_path) |
303 | self.addCleanup(_cleanup) |
304 | |
305 | fileutil.write(long_path, b"test") |
306 | self.failUnless(os.path.exists(long_path)) |
307 | self.failUnlessEqual(fileutil.read(long_path), b"test") |
308 | _cleanup() |
309 | self.failIf(os.path.exists(long_path)) |
310 | |
311 | def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None): |
312 | def call_windows_getenv(name): |
313 | if name == u"USERPROFILE": return userprofile |
314 | if name == u"HOMEDRIVE": return homedrive |
315 | if name == u"HOMEPATH": return homepath |
316 | self.fail("unexpected argument to call_windows_getenv") |
317 | self.patch(fileutil, 'windows_getenv', call_windows_getenv) |
318 | |
319 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) |
320 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) |
321 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) |
322 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a") |
323 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~") |
324 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo") |
325 | |
326 | def test_windows_expanduser_xp(self): |
327 | return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100") |
328 | |
329 | def test_windows_expanduser_win7(self): |
330 | return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) |
331 | |
332 | def test_disk_stats(self): |
333 | avail = fileutil.get_available_space('.', 2**14) |
334 | if avail == 0: |
335 | raise unittest.SkipTest("This test will spuriously fail there is no disk space left.") |
336 | |
337 | disk = fileutil.get_disk_stats('.', 2**13) |
338 | self.failUnless(disk['total'] > 0, disk['total']) |
339 | # we tolerate used==0 for a Travis-CI bug, see #2290 |
340 | self.failUnless(disk['used'] >= 0, disk['used']) |
341 | self.failUnless(disk['free_for_root'] > 0, disk['free_for_root']) |
342 | self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot']) |
343 | self.failUnless(disk['avail'] > 0, disk['avail']) |
344 | |
345 | def test_disk_stats_avail_nonnegative(self): |
346 | # This test will spuriously fail if you have more than 2^128 |
347 | # bytes of available space on your filesystem. |
348 | disk = fileutil.get_disk_stats('.', 2**128) |
349 | self.failUnlessEqual(disk['avail'], 0) |
350 | |
351 | def test_get_pathinfo(self): |
352 | basedir = "util/FileUtil/test_get_pathinfo" |
353 | fileutil.make_dirs(basedir) |
354 | |
355 | # create a directory |
356 | self.mkdir(basedir, "a") |
357 | dirinfo = fileutil.get_pathinfo(basedir) |
358 | self.failUnlessTrue(dirinfo.isdir) |
359 | self.failUnlessTrue(dirinfo.exists) |
360 | self.failUnlessFalse(dirinfo.isfile) |
361 | self.failUnlessFalse(dirinfo.islink) |
362 | |
363 | # create a file |
364 | f = os.path.join(basedir, "1.txt") |
365 | fileutil.write(f, b"a"*10) |
366 | fileinfo = fileutil.get_pathinfo(f) |
367 | self.failUnlessTrue(fileinfo.isfile) |
368 | self.failUnlessTrue(fileinfo.exists) |
369 | self.failUnlessFalse(fileinfo.isdir) |
370 | self.failUnlessFalse(fileinfo.islink) |
371 | self.failUnlessEqual(fileinfo.size, 10) |
372 | |
373 | # path at which nothing exists |
374 | dnename = os.path.join(basedir, "doesnotexist") |
375 | now_ns = fileutil.seconds_to_ns(time.time()) |
376 | dneinfo = fileutil.get_pathinfo(dnename, now_ns=now_ns) |
377 | self.failUnlessFalse(dneinfo.exists) |
378 | self.failUnlessFalse(dneinfo.isfile) |
379 | self.failUnlessFalse(dneinfo.isdir) |
380 | self.failUnlessFalse(dneinfo.islink) |
381 | self.failUnlessEqual(dneinfo.size, None) |
382 | self.failUnlessEqual(dneinfo.mtime_ns, now_ns) |
383 | self.failUnlessEqual(dneinfo.ctime_ns, now_ns) |
384 | |
385 | def test_get_pathinfo_symlink(self): |
386 | if not hasattr(os, 'symlink'): |
387 | raise unittest.SkipTest("can't create symlinks on this platform") |
388 | |
389 | basedir = "util/FileUtil/test_get_pathinfo" |
390 | fileutil.make_dirs(basedir) |
391 | |
392 | f = os.path.join(basedir, "1.txt") |
393 | fileutil.write(f, b"a"*10) |
394 | |
395 | # create a symlink pointing to 1.txt |
396 | slname = os.path.join(basedir, "linkto1.txt") |
397 | os.symlink(f, slname) |
398 | symlinkinfo = fileutil.get_pathinfo(slname) |
399 | self.failUnlessTrue(symlinkinfo.islink) |
400 | self.failUnlessTrue(symlinkinfo.exists) |
401 | self.failUnlessFalse(symlinkinfo.isfile) |
402 | self.failUnlessFalse(symlinkinfo.isdir) |
403 | |
404 | def test_encrypted_tempfile(self): |
405 | f = EncryptedTemporaryFile() |
406 | f.write(b"foobar") |
407 | f.close() |
408 | |
409 | def test_write(self): |
410 | """fileutil.write() can write both unicode and bytes.""" |
411 | path = self.mktemp() |
412 | fileutil.write(path, b"abc") |
413 | with open(path, "rb") as f: |
414 | self.assertEqual(f.read(), b"abc") |
415 | fileutil.write(path, u"def \u1234") |
416 | with open(path, "rb") as f: |
417 | self.assertEqual(f.read(), u"def \u1234".encode("utf-8")) |
418 | |
419 | |
420 | class PollMixinTests(unittest.TestCase): |
421 | def setUp(self): |
422 | self.pm = pollmixin.PollMixin() |
423 | |
424 | def test_PollMixin_True(self): |
425 | d = self.pm.poll(check_f=lambda : True, |
426 | pollinterval=0.1) |
427 | return d |
428 | |
429 | def test_PollMixin_False_then_True(self): |
430 | i = iter([False, True]) |
431 | d = self.pm.poll(check_f=lambda: next(i), |
432 | pollinterval=0.1) |
433 | return d |
434 | |
435 | def test_timeout(self): |
436 | d = self.pm.poll(check_f=lambda: False, |
437 | pollinterval=0.01, |
438 | timeout=1) |
439 | def _suc(res): |
440 | self.fail("poll should have failed, not returned %s" % (res,)) |
441 | def _err(f): |
442 | f.trap(pollmixin.TimeoutError) |
443 | return None # success |
444 | d.addCallbacks(_suc, _err) |
445 | return d |
446 | |
447 | |
448 | ctr = [0] |
449 | class EqButNotIs(object): |
450 | def __init__(self, x): |
451 | self.x = x |
452 | self.hash = ctr[0] |
453 | ctr[0] += 1 |
454 | def __repr__(self): |
455 | return "<%s %s>" % (self.__class__.__name__, self.x,) |
456 | def __hash__(self): |
457 | return self.hash |
458 | def __le__(self, other): |
459 | return self.x <= other |
460 | def __lt__(self, other): |
461 | return self.x < other |
462 | def __ge__(self, other): |
463 | return self.x >= other |
464 | def __gt__(self, other): |
465 | return self.x > other |
466 | def __ne__(self, other): |
467 | return self.x != other |
468 | def __eq__(self, other): |
469 | return self.x == other |
470 | |
471 | |
472 | class YAML(unittest.TestCase): |
473 | def test_convert(self): |
474 | """ |
475 | Unicode and (ASCII) native strings get roundtripped to Unicode strings. |
476 | """ |
477 | data = yaml.safe_dump( |
478 | ["str", "unicode", "\u1234nicode"] |
479 | ) |
480 | back = yamlutil.safe_load(data) |
481 | self.assertIsInstance(back[0], str) |
482 | self.assertIsInstance(back[1], str) |
483 | self.assertIsInstance(back[2], str) |
484 | |
485 | |
486 | class JSONBytes(unittest.TestCase): |
487 | """Tests for jsonbytes module.""" |
488 | |
489 | def test_encode_bytes(self): |
490 | """jsonbytes.dumps() encodes bytes. |
491 | |
492 | Bytes are presumed to be UTF-8 encoded. |
493 | """ |
494 | snowman = u"def\N{SNOWMAN}\uFF00" |
495 | data = { |
496 | b"hello": [1, b"cd", {b"abc": [123, snowman.encode("utf-8")]}], |
497 | } |
498 | expected = { |
499 | u"hello": [1, u"cd", {u"abc": [123, snowman]}], |
500 | } |
501 | # Bytes get passed through as if they were UTF-8 Unicode: |
502 | encoded = jsonbytes.dumps(data) |
503 | self.assertEqual(json.loads(encoded), expected) |
504 | self.assertEqual(jsonbytes.loads(encoded), expected) |
505 | |
506 | def test_encode_unicode(self): |
507 | """jsonbytes.dumps() encodes Unicode string as usual.""" |
508 | expected = { |
509 | u"hello": [1, u"cd"], |
510 | } |
511 | encoded = jsonbytes.dumps(expected) |
512 | self.assertEqual(json.loads(encoded), expected) |
513 | |
514 | def test_dumps_bytes(self): |
515 | """jsonbytes.dumps_bytes always returns bytes.""" |
516 | x = {u"def\N{SNOWMAN}\uFF00": 123} |
517 | encoded = jsonbytes.dumps_bytes(x) |
518 | self.assertIsInstance(encoded, bytes) |
519 | self.assertEqual(json.loads(encoded), x) |
520 | |
521 | def test_any_bytes_unsupported_by_default(self): |
522 | """By default non-UTF-8 bytes raise error.""" |
523 | bytestring = b"abc\xff\x00" |
524 | with self.assertRaises(UnicodeDecodeError): |
525 | jsonbytes.dumps(bytestring) |
526 | with self.assertRaises(UnicodeDecodeError): |
527 | jsonbytes.dumps_bytes(bytestring) |
528 | with self.assertRaises(UnicodeDecodeError): |
529 | json.dumps(bytestring, cls=jsonbytes.UTF8BytesJSONEncoder) |
530 | |
531 | def test_any_bytes(self): |
532 | """If any_bytes is True, non-UTF-8 bytes don't break encoding.""" |
533 | bytestring = b"abc\xff\xff123" |
534 | o = {bytestring: bytestring} |
535 | expected = {"abc\\xff\\xff123": "abc\\xff\\xff123"} |
536 | self.assertEqual( |
537 | json.loads(jsonbytes.dumps(o, any_bytes=True)), |
538 | expected, |
539 | ) |
540 | self.assertEqual( |
541 | json.loads(json.dumps( |
542 | o, cls=jsonbytes.AnyBytesJSONEncoder)), |
543 | expected, |
544 | ) |
545 | self.assertEqual( |
546 | json.loads(jsonbytes.dumps(o, any_bytes=True)), |
547 | expected |
548 | ) |
549 | |
550 | def test_dumps_bytes_unicode_separators(self): |
551 | """Unicode separators don't prevent the result from being bytes.""" |
552 | result = jsonbytes.dumps_bytes([1, 2], separators=(u',', u':')) |
553 | self.assertIsInstance(result, bytes) |
554 | self.assertEqual(result, b"[1,2]") |
555 | |
556 | |
557 | |
558 | class FakeGetVersion(object): |
559 | """Emulate an object with a get_version.""" |
560 | |
561 | def __init__(self, result): |
562 | self.result = result |
563 | |
564 | def remote_get_version(self): |
565 | if isinstance(self.result, Exception): |
566 | raise self.result |
567 | return self.result |
568 | |
569 | |
570 | class RrefUtilTests(unittest.TestCase): |
571 | """Tests for rrefutil.""" |
572 | |
573 | def test_version_returned(self): |
574 | """If get_version() succeeded, it is set on the rref.""" |
575 | rref = LocalWrapper(FakeGetVersion(12345), fireNow) |
576 | result = self.successResultOf( |
577 | rrefutil.add_version_to_remote_reference(rref, "default") |
578 | ) |
579 | self.assertEqual(result.version, 12345) |
580 | self.assertIdentical(result, rref) |
581 | |
582 | def test_exceptions(self): |
583 | """If get_version() failed, default version is set on the rref.""" |
584 | for exception in (Violation(), RemoteException(ValueError())): |
585 | rref = LocalWrapper(FakeGetVersion(exception), fireNow) |
586 | result = self.successResultOf( |
587 | rrefutil.add_version_to_remote_reference(rref, "Default") |
588 | ) |
589 | self.assertEqual(result.version, "Default") |
590 | self.assertIdentical(result, rref) |
591 | |
592 | |
593 | class CPUThreadPool(unittest.TestCase): |
594 | """Tests for cputhreadpool.""" |
595 | |
596 | async def test_runs_in_thread(self): |
597 | """The given function runs in a thread.""" |
598 | def f(*args, **kwargs): |
599 | return current_thread(), args, kwargs |
600 | |
601 | this_thread = current_thread().ident |
602 | thread, args, kwargs = await defer_to_thread(f, 1, 3, key=4, value=5) |
603 | |
604 | # The task ran in a different thread: |
605 | self.assertNotEqual(thread.ident, this_thread) |
606 | self.assertEqual(args, (1, 3)) |
607 | self.assertEqual(kwargs, {"key": 4, "value": 5}) |
608 | |
609 | async def test_when_disabled_runs_in_same_thread(self): |
610 | """ |
611 | If the CPU thread pool is disabled, the given function runs in the |
612 | current thread. |
613 | """ |
614 | disable_thread_pool_for_test(self) |
615 | def f(*args, **kwargs): |
616 | return current_thread().ident, args, kwargs |
617 | |
618 | this_thread = current_thread().ident |
619 | thread, args, kwargs = await defer_to_thread(f, 1, 3, key=4, value=5) |
620 | |
621 | self.assertEqual(thread, this_thread) |
622 | self.assertEqual(args, (1, 3)) |
623 | self.assertEqual(kwargs, {"key": 4, "value": 5}) |