1 | """ |
---|
2 | Ported to Python3. |
---|
3 | """ |
---|
4 | |
---|
5 | import os, time, sys |
---|
6 | import yaml |
---|
7 | import json |
---|
8 | from threading import current_thread |
---|
9 | |
---|
10 | from twisted.trial import unittest |
---|
11 | from foolscap.api import Violation, RemoteException |
---|
12 | |
---|
13 | from allmydata.util import idlib, mathutil |
---|
14 | from allmydata.util import fileutil |
---|
15 | from allmydata.util import jsonbytes |
---|
16 | from allmydata.util import pollmixin |
---|
17 | from allmydata.util import yamlutil |
---|
18 | from allmydata.util import rrefutil |
---|
19 | from allmydata.util.fileutil import EncryptedTemporaryFile |
---|
20 | from allmydata.util.cputhreadpool import defer_to_thread, disable_thread_pool_for_test |
---|
21 | from allmydata.test.common_util import ReallyEqualMixin |
---|
22 | from .no_network import fireNow, LocalWrapper |
---|
23 | |
---|
24 | long = int |
---|
25 | |
---|
26 | |
---|
27 | class IDLib(unittest.TestCase): |
---|
28 | def test_nodeid_b2a(self): |
---|
29 | result = idlib.nodeid_b2a(b"\x00"*20) |
---|
30 | self.assertEqual(result, "a"*32) |
---|
31 | self.assertIsInstance(result, str) |
---|
32 | |
---|
33 | |
---|
34 | class MyList(list): |
---|
35 | pass |
---|
36 | |
---|
37 | class Math(unittest.TestCase): |
---|
38 | def test_round_sigfigs(self): |
---|
39 | f = mathutil.round_sigfigs |
---|
40 | self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002) |
---|
41 | |
---|
42 | |
---|
43 | class FileUtil(ReallyEqualMixin, unittest.TestCase): |
---|
44 | def mkdir(self, basedir, path, mode=0o777): |
---|
45 | fn = os.path.join(basedir, path) |
---|
46 | fileutil.make_dirs(fn, mode) |
---|
47 | |
---|
48 | def touch(self, basedir, path, mode=None, data="touch\n"): |
---|
49 | fn = os.path.join(basedir, path) |
---|
50 | f = open(fn, "w") |
---|
51 | f.write(data) |
---|
52 | f.close() |
---|
53 | if mode is not None: |
---|
54 | os.chmod(fn, mode) |
---|
55 | |
---|
56 | def test_rm_dir(self): |
---|
57 | basedir = "util/FileUtil/test_rm_dir" |
---|
58 | fileutil.make_dirs(basedir) |
---|
59 | # create it again to test idempotency |
---|
60 | fileutil.make_dirs(basedir) |
---|
61 | d = os.path.join(basedir, "doomed") |
---|
62 | self.mkdir(d, "a/b") |
---|
63 | self.touch(d, "a/b/1.txt") |
---|
64 | self.touch(d, "a/b/2.txt", 0o444) |
---|
65 | self.touch(d, "a/b/3.txt", 0) |
---|
66 | self.mkdir(d, "a/c") |
---|
67 | self.touch(d, "a/c/1.txt") |
---|
68 | self.touch(d, "a/c/2.txt", 0o444) |
---|
69 | self.touch(d, "a/c/3.txt", 0) |
---|
70 | os.chmod(os.path.join(d, "a/c"), 0o444) |
---|
71 | self.mkdir(d, "a/d") |
---|
72 | self.touch(d, "a/d/1.txt") |
---|
73 | self.touch(d, "a/d/2.txt", 0o444) |
---|
74 | self.touch(d, "a/d/3.txt", 0) |
---|
75 | os.chmod(os.path.join(d, "a/d"), 0) |
---|
76 | |
---|
77 | fileutil.rm_dir(d) |
---|
78 | self.failIf(os.path.exists(d)) |
---|
79 | # remove it again to test idempotency |
---|
80 | fileutil.rm_dir(d) |
---|
81 | |
---|
82 | def test_remove_if_possible(self): |
---|
83 | basedir = "util/FileUtil/test_remove_if_possible" |
---|
84 | fileutil.make_dirs(basedir) |
---|
85 | self.touch(basedir, "here") |
---|
86 | fn = os.path.join(basedir, "here") |
---|
87 | fileutil.remove_if_possible(fn) |
---|
88 | self.failIf(os.path.exists(fn)) |
---|
89 | fileutil.remove_if_possible(fn) # should be idempotent |
---|
90 | fileutil.rm_dir(basedir) |
---|
91 | fileutil.remove_if_possible(fn) # should survive errors |
---|
92 | |
---|
93 | def test_write_atomically(self): |
---|
94 | basedir = "util/FileUtil/test_write_atomically" |
---|
95 | fileutil.make_dirs(basedir) |
---|
96 | fn = os.path.join(basedir, "here") |
---|
97 | fileutil.write_atomically(fn, b"one", "b") |
---|
98 | self.failUnlessEqual(fileutil.read(fn), b"one") |
---|
99 | fileutil.write_atomically(fn, u"two", mode="") # non-binary |
---|
100 | self.failUnlessEqual(fileutil.read(fn), b"two") |
---|
101 | |
---|
102 | def test_rename(self): |
---|
103 | basedir = "util/FileUtil/test_rename" |
---|
104 | fileutil.make_dirs(basedir) |
---|
105 | self.touch(basedir, "here") |
---|
106 | fn = os.path.join(basedir, "here") |
---|
107 | fn2 = os.path.join(basedir, "there") |
---|
108 | fileutil.rename(fn, fn2) |
---|
109 | self.failIf(os.path.exists(fn)) |
---|
110 | self.failUnless(os.path.exists(fn2)) |
---|
111 | |
---|
112 | def test_rename_no_overwrite(self): |
---|
113 | workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite") |
---|
114 | fileutil.make_dirs(workdir) |
---|
115 | |
---|
116 | source_path = os.path.join(workdir, "source") |
---|
117 | dest_path = os.path.join(workdir, "dest") |
---|
118 | |
---|
119 | # when neither file exists |
---|
120 | self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) |
---|
121 | |
---|
122 | # when only dest exists |
---|
123 | fileutil.write(dest_path, b"dest") |
---|
124 | self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) |
---|
125 | self.failUnlessEqual(fileutil.read(dest_path), b"dest") |
---|
126 | |
---|
127 | # when both exist |
---|
128 | fileutil.write(source_path, b"source") |
---|
129 | self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path) |
---|
130 | self.failUnlessEqual(fileutil.read(source_path), b"source") |
---|
131 | self.failUnlessEqual(fileutil.read(dest_path), b"dest") |
---|
132 | |
---|
133 | # when only source exists |
---|
134 | os.remove(dest_path) |
---|
135 | fileutil.rename_no_overwrite(source_path, dest_path) |
---|
136 | self.failUnlessEqual(fileutil.read(dest_path), b"source") |
---|
137 | self.failIf(os.path.exists(source_path)) |
---|
138 | |
---|
139 | def test_replace_file(self): |
---|
140 | workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file") |
---|
141 | fileutil.make_dirs(workdir) |
---|
142 | |
---|
143 | replaced_path = os.path.join(workdir, "replaced") |
---|
144 | replacement_path = os.path.join(workdir, "replacement") |
---|
145 | |
---|
146 | # when none of the files exist |
---|
147 | self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path) |
---|
148 | |
---|
149 | # when only replaced exists |
---|
150 | fileutil.write(replaced_path, b"foo") |
---|
151 | self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path) |
---|
152 | self.failUnlessEqual(fileutil.read(replaced_path), b"foo") |
---|
153 | |
---|
154 | # when both replaced and replacement exist |
---|
155 | fileutil.write(replacement_path, b"bar") |
---|
156 | fileutil.replace_file(replaced_path, replacement_path) |
---|
157 | self.failUnlessEqual(fileutil.read(replaced_path), b"bar") |
---|
158 | self.failIf(os.path.exists(replacement_path)) |
---|
159 | |
---|
160 | # when only replacement exists |
---|
161 | os.remove(replaced_path) |
---|
162 | fileutil.write(replacement_path, b"bar") |
---|
163 | fileutil.replace_file(replaced_path, replacement_path) |
---|
164 | self.failUnlessEqual(fileutil.read(replaced_path), b"bar") |
---|
165 | self.failIf(os.path.exists(replacement_path)) |
---|
166 | |
---|
167 | def test_du(self): |
---|
168 | basedir = "util/FileUtil/test_du" |
---|
169 | fileutil.make_dirs(basedir) |
---|
170 | d = os.path.join(basedir, "space-consuming") |
---|
171 | self.mkdir(d, "a/b") |
---|
172 | self.touch(d, "a/b/1.txt", data="a"*10) |
---|
173 | self.touch(d, "a/b/2.txt", data="b"*11) |
---|
174 | self.mkdir(d, "a/c") |
---|
175 | self.touch(d, "a/c/1.txt", data="c"*12) |
---|
176 | self.touch(d, "a/c/2.txt", data="d"*13) |
---|
177 | |
---|
178 | used = fileutil.du(basedir) |
---|
179 | self.failUnlessEqual(10+11+12+13, used) |
---|
180 | |
---|
181 | def test_abspath_expanduser_unicode(self): |
---|
182 | self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, b"bytestring") |
---|
183 | |
---|
184 | saved_cwd = os.path.normpath(os.getcwd()) |
---|
185 | abspath_cwd = fileutil.abspath_expanduser_unicode(u".") |
---|
186 | abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False) |
---|
187 | self.failUnless(isinstance(saved_cwd, str), saved_cwd) |
---|
188 | self.failUnless(isinstance(abspath_cwd, str), abspath_cwd) |
---|
189 | if sys.platform == "win32": |
---|
190 | self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd)) |
---|
191 | else: |
---|
192 | self.failUnlessReallyEqual(abspath_cwd, saved_cwd) |
---|
193 | self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd) |
---|
194 | |
---|
195 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo") |
---|
196 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo") |
---|
197 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo") |
---|
198 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo") |
---|
199 | self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar") |
---|
200 | |
---|
201 | # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath> |
---|
202 | |
---|
203 | foo = fileutil.abspath_expanduser_unicode(u"foo") |
---|
204 | self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo) |
---|
205 | |
---|
206 | foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo) |
---|
207 | self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar) |
---|
208 | |
---|
209 | if sys.platform == "win32": |
---|
210 | # This is checking that a drive letter is added for a path without one. |
---|
211 | baz = fileutil.abspath_expanduser_unicode(u"\\baz") |
---|
212 | self.failUnless(baz.startswith(u"\\\\?\\"), baz) |
---|
213 | self.failUnlessReallyEqual(baz[5 :], u":\\baz") |
---|
214 | |
---|
215 | bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz) |
---|
216 | self.failUnless(bar.startswith(u"\\\\?\\"), bar) |
---|
217 | self.failUnlessReallyEqual(bar[5 :], u":\\bar") |
---|
218 | # not u":\\baz\\bar", because \bar is absolute on the current drive. |
---|
219 | |
---|
220 | self.failUnlessReallyEqual(baz[4], bar[4]) # same drive |
---|
221 | |
---|
222 | baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False) |
---|
223 | self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong) |
---|
224 | self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz") |
---|
225 | |
---|
226 | bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False) |
---|
227 | self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong) |
---|
228 | self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar") |
---|
229 | # not u":\\baz\\bar", because \bar is absolute on the current drive. |
---|
230 | |
---|
231 | self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive |
---|
232 | |
---|
233 | self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~")) |
---|
234 | self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False)) |
---|
235 | |
---|
236 | cwds = ['cwd'] |
---|
237 | try: |
---|
238 | cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding() |
---|
239 | or 'ascii')) |
---|
240 | except UnicodeEncodeError: |
---|
241 | pass # the cwd can't be encoded -- test with ascii cwd only |
---|
242 | |
---|
243 | for cwd in cwds: |
---|
244 | try: |
---|
245 | os.mkdir(cwd) |
---|
246 | os.chdir(cwd) |
---|
247 | for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'): |
---|
248 | uabspath = fileutil.abspath_expanduser_unicode(upath) |
---|
249 | self.failUnless(isinstance(uabspath, str), uabspath) |
---|
250 | |
---|
251 | uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False) |
---|
252 | self.failUnless(isinstance(uabspath_notlong, str), uabspath_notlong) |
---|
253 | finally: |
---|
254 | os.chdir(saved_cwd) |
---|
255 | |
---|
256 | def test_make_dirs_with_absolute_mode(self): |
---|
257 | if sys.platform == 'win32': |
---|
258 | raise unittest.SkipTest("Permissions don't work the same on windows.") |
---|
259 | |
---|
260 | workdir = fileutil.abspath_expanduser_unicode(u"test_make_dirs_with_absolute_mode") |
---|
261 | fileutil.make_dirs(workdir) |
---|
262 | abspath = fileutil.abspath_expanduser_unicode(u"a/b/c/d", base=workdir) |
---|
263 | fileutil.make_dirs_with_absolute_mode(workdir, abspath, 0o766) |
---|
264 | new_mode = os.stat(os.path.join(workdir, "a", "b", "c", "d")).st_mode & 0o777 |
---|
265 | self.failUnlessEqual(new_mode, 0o766) |
---|
266 | new_mode = os.stat(os.path.join(workdir, "a", "b", "c")).st_mode & 0o777 |
---|
267 | self.failUnlessEqual(new_mode, 0o766) |
---|
268 | new_mode = os.stat(os.path.join(workdir, "a", "b")).st_mode & 0o777 |
---|
269 | self.failUnlessEqual(new_mode, 0o766) |
---|
270 | new_mode = os.stat(os.path.join(workdir, "a")).st_mode & 0o777 |
---|
271 | self.failUnlessEqual(new_mode, 0o766) |
---|
272 | new_mode = os.stat(workdir).st_mode & 0o777 |
---|
273 | self.failIfEqual(new_mode, 0o766) |
---|
274 | |
---|
275 | def test_create_long_path(self): |
---|
276 | """ |
---|
277 | Even for paths with total length greater than 260 bytes, |
---|
278 | ``fileutil.abspath_expanduser_unicode`` produces a path on which other |
---|
279 | path-related APIs can operate. |
---|
280 | |
---|
281 | https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx |
---|
282 | documents certain Windows-specific path length limitations this test |
---|
283 | is specifically intended to demonstrate can be overcome. |
---|
284 | """ |
---|
285 | workdir = u"test_create_long_path" |
---|
286 | fileutil.make_dirs(workdir) |
---|
287 | base_path = fileutil.abspath_expanduser_unicode(workdir) |
---|
288 | base_length = len(base_path) |
---|
289 | |
---|
290 | # Construct a path /just/ long enough to exercise the important case. |
---|
291 | # It would be nice if we could just use a seemingly globally valid |
---|
292 | # long file name (the `x...` portion) here - for example, a name 255 |
---|
293 | # bytes long- and a previous version of this test did just that. |
---|
294 | # However, aufs imposes a 242 byte length limit on file names. Most |
---|
295 | # other POSIX filesystems do allow names up to 255 bytes. It's not |
---|
296 | # clear there's anything we can *do* about lower limits, though, and |
---|
297 | # POSIX.1-2017 (and earlier) only requires that the maximum be at |
---|
298 | # least 14 (!!!) bytes. |
---|
299 | long_path = os.path.join(base_path, u'x' * (261 - base_length)) |
---|
300 | |
---|
301 | def _cleanup(): |
---|
302 | fileutil.remove(long_path) |
---|
303 | self.addCleanup(_cleanup) |
---|
304 | |
---|
305 | fileutil.write(long_path, b"test") |
---|
306 | self.failUnless(os.path.exists(long_path)) |
---|
307 | self.failUnlessEqual(fileutil.read(long_path), b"test") |
---|
308 | _cleanup() |
---|
309 | self.failIf(os.path.exists(long_path)) |
---|
310 | |
---|
311 | def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None): |
---|
312 | def call_windows_getenv(name): |
---|
313 | if name == u"USERPROFILE": return userprofile |
---|
314 | if name == u"HOMEDRIVE": return homedrive |
---|
315 | if name == u"HOMEPATH": return homepath |
---|
316 | self.fail("unexpected argument to call_windows_getenv") |
---|
317 | self.patch(fileutil, 'windows_getenv', call_windows_getenv) |
---|
318 | |
---|
319 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) |
---|
320 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) |
---|
321 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) |
---|
322 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a") |
---|
323 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~") |
---|
324 | self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo") |
---|
325 | |
---|
326 | def test_windows_expanduser_xp(self): |
---|
327 | return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100") |
---|
328 | |
---|
329 | def test_windows_expanduser_win7(self): |
---|
330 | return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) |
---|
331 | |
---|
332 | def test_disk_stats(self): |
---|
333 | avail = fileutil.get_available_space('.', 2**14) |
---|
334 | if avail == 0: |
---|
335 | raise unittest.SkipTest("This test will spuriously fail there is no disk space left.") |
---|
336 | |
---|
337 | disk = fileutil.get_disk_stats('.', 2**13) |
---|
338 | self.failUnless(disk['total'] > 0, disk['total']) |
---|
339 | # we tolerate used==0 for a Travis-CI bug, see #2290 |
---|
340 | self.failUnless(disk['used'] >= 0, disk['used']) |
---|
341 | self.failUnless(disk['free_for_root'] > 0, disk['free_for_root']) |
---|
342 | self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot']) |
---|
343 | self.failUnless(disk['avail'] > 0, disk['avail']) |
---|
344 | |
---|
345 | def test_disk_stats_avail_nonnegative(self): |
---|
346 | # This test will spuriously fail if you have more than 2^128 |
---|
347 | # bytes of available space on your filesystem. |
---|
348 | disk = fileutil.get_disk_stats('.', 2**128) |
---|
349 | self.failUnlessEqual(disk['avail'], 0) |
---|
350 | |
---|
351 | def test_get_pathinfo(self): |
---|
352 | basedir = "util/FileUtil/test_get_pathinfo" |
---|
353 | fileutil.make_dirs(basedir) |
---|
354 | |
---|
355 | # create a directory |
---|
356 | self.mkdir(basedir, "a") |
---|
357 | dirinfo = fileutil.get_pathinfo(basedir) |
---|
358 | self.failUnlessTrue(dirinfo.isdir) |
---|
359 | self.failUnlessTrue(dirinfo.exists) |
---|
360 | self.failUnlessFalse(dirinfo.isfile) |
---|
361 | self.failUnlessFalse(dirinfo.islink) |
---|
362 | |
---|
363 | # create a file |
---|
364 | f = os.path.join(basedir, "1.txt") |
---|
365 | fileutil.write(f, b"a"*10) |
---|
366 | fileinfo = fileutil.get_pathinfo(f) |
---|
367 | self.failUnlessTrue(fileinfo.isfile) |
---|
368 | self.failUnlessTrue(fileinfo.exists) |
---|
369 | self.failUnlessFalse(fileinfo.isdir) |
---|
370 | self.failUnlessFalse(fileinfo.islink) |
---|
371 | self.failUnlessEqual(fileinfo.size, 10) |
---|
372 | |
---|
373 | # path at which nothing exists |
---|
374 | dnename = os.path.join(basedir, "doesnotexist") |
---|
375 | now_ns = fileutil.seconds_to_ns(time.time()) |
---|
376 | dneinfo = fileutil.get_pathinfo(dnename, now_ns=now_ns) |
---|
377 | self.failUnlessFalse(dneinfo.exists) |
---|
378 | self.failUnlessFalse(dneinfo.isfile) |
---|
379 | self.failUnlessFalse(dneinfo.isdir) |
---|
380 | self.failUnlessFalse(dneinfo.islink) |
---|
381 | self.failUnlessEqual(dneinfo.size, None) |
---|
382 | self.failUnlessEqual(dneinfo.mtime_ns, now_ns) |
---|
383 | self.failUnlessEqual(dneinfo.ctime_ns, now_ns) |
---|
384 | |
---|
385 | def test_get_pathinfo_symlink(self): |
---|
386 | if not hasattr(os, 'symlink'): |
---|
387 | raise unittest.SkipTest("can't create symlinks on this platform") |
---|
388 | |
---|
389 | basedir = "util/FileUtil/test_get_pathinfo" |
---|
390 | fileutil.make_dirs(basedir) |
---|
391 | |
---|
392 | f = os.path.join(basedir, "1.txt") |
---|
393 | fileutil.write(f, b"a"*10) |
---|
394 | |
---|
395 | # create a symlink pointing to 1.txt |
---|
396 | slname = os.path.join(basedir, "linkto1.txt") |
---|
397 | os.symlink(f, slname) |
---|
398 | symlinkinfo = fileutil.get_pathinfo(slname) |
---|
399 | self.failUnlessTrue(symlinkinfo.islink) |
---|
400 | self.failUnlessTrue(symlinkinfo.exists) |
---|
401 | self.failUnlessFalse(symlinkinfo.isfile) |
---|
402 | self.failUnlessFalse(symlinkinfo.isdir) |
---|
403 | |
---|
404 | def test_encrypted_tempfile(self): |
---|
405 | f = EncryptedTemporaryFile() |
---|
406 | f.write(b"foobar") |
---|
407 | f.close() |
---|
408 | |
---|
409 | def test_write(self): |
---|
410 | """fileutil.write() can write both unicode and bytes.""" |
---|
411 | path = self.mktemp() |
---|
412 | fileutil.write(path, b"abc") |
---|
413 | with open(path, "rb") as f: |
---|
414 | self.assertEqual(f.read(), b"abc") |
---|
415 | fileutil.write(path, u"def \u1234") |
---|
416 | with open(path, "rb") as f: |
---|
417 | self.assertEqual(f.read(), u"def \u1234".encode("utf-8")) |
---|
418 | |
---|
419 | |
---|
420 | class PollMixinTests(unittest.TestCase): |
---|
421 | def setUp(self): |
---|
422 | self.pm = pollmixin.PollMixin() |
---|
423 | |
---|
424 | def test_PollMixin_True(self): |
---|
425 | d = self.pm.poll(check_f=lambda : True, |
---|
426 | pollinterval=0.1) |
---|
427 | return d |
---|
428 | |
---|
429 | def test_PollMixin_False_then_True(self): |
---|
430 | i = iter([False, True]) |
---|
431 | d = self.pm.poll(check_f=lambda: next(i), |
---|
432 | pollinterval=0.1) |
---|
433 | return d |
---|
434 | |
---|
435 | def test_timeout(self): |
---|
436 | d = self.pm.poll(check_f=lambda: False, |
---|
437 | pollinterval=0.01, |
---|
438 | timeout=1) |
---|
439 | def _suc(res): |
---|
440 | self.fail("poll should have failed, not returned %s" % (res,)) |
---|
441 | def _err(f): |
---|
442 | f.trap(pollmixin.TimeoutError) |
---|
443 | return None # success |
---|
444 | d.addCallbacks(_suc, _err) |
---|
445 | return d |
---|
446 | |
---|
447 | |
---|
448 | ctr = [0] |
---|
449 | class EqButNotIs(object): |
---|
450 | def __init__(self, x): |
---|
451 | self.x = x |
---|
452 | self.hash = ctr[0] |
---|
453 | ctr[0] += 1 |
---|
454 | def __repr__(self): |
---|
455 | return "<%s %s>" % (self.__class__.__name__, self.x,) |
---|
456 | def __hash__(self): |
---|
457 | return self.hash |
---|
458 | def __le__(self, other): |
---|
459 | return self.x <= other |
---|
460 | def __lt__(self, other): |
---|
461 | return self.x < other |
---|
462 | def __ge__(self, other): |
---|
463 | return self.x >= other |
---|
464 | def __gt__(self, other): |
---|
465 | return self.x > other |
---|
466 | def __ne__(self, other): |
---|
467 | return self.x != other |
---|
468 | def __eq__(self, other): |
---|
469 | return self.x == other |
---|
470 | |
---|
471 | |
---|
472 | class YAML(unittest.TestCase): |
---|
473 | def test_convert(self): |
---|
474 | """ |
---|
475 | Unicode and (ASCII) native strings get roundtripped to Unicode strings. |
---|
476 | """ |
---|
477 | data = yaml.safe_dump( |
---|
478 | ["str", "unicode", "\u1234nicode"] |
---|
479 | ) |
---|
480 | back = yamlutil.safe_load(data) |
---|
481 | self.assertIsInstance(back[0], str) |
---|
482 | self.assertIsInstance(back[1], str) |
---|
483 | self.assertIsInstance(back[2], str) |
---|
484 | |
---|
485 | |
---|
486 | class JSONBytes(unittest.TestCase): |
---|
487 | """Tests for jsonbytes module.""" |
---|
488 | |
---|
489 | def test_encode_bytes(self): |
---|
490 | """jsonbytes.dumps() encodes bytes. |
---|
491 | |
---|
492 | Bytes are presumed to be UTF-8 encoded. |
---|
493 | """ |
---|
494 | snowman = u"def\N{SNOWMAN}\uFF00" |
---|
495 | data = { |
---|
496 | b"hello": [1, b"cd", {b"abc": [123, snowman.encode("utf-8")]}], |
---|
497 | } |
---|
498 | expected = { |
---|
499 | u"hello": [1, u"cd", {u"abc": [123, snowman]}], |
---|
500 | } |
---|
501 | # Bytes get passed through as if they were UTF-8 Unicode: |
---|
502 | encoded = jsonbytes.dumps(data) |
---|
503 | self.assertEqual(json.loads(encoded), expected) |
---|
504 | self.assertEqual(jsonbytes.loads(encoded), expected) |
---|
505 | |
---|
506 | def test_encode_unicode(self): |
---|
507 | """jsonbytes.dumps() encodes Unicode string as usual.""" |
---|
508 | expected = { |
---|
509 | u"hello": [1, u"cd"], |
---|
510 | } |
---|
511 | encoded = jsonbytes.dumps(expected) |
---|
512 | self.assertEqual(json.loads(encoded), expected) |
---|
513 | |
---|
514 | def test_dumps_bytes(self): |
---|
515 | """jsonbytes.dumps_bytes always returns bytes.""" |
---|
516 | x = {u"def\N{SNOWMAN}\uFF00": 123} |
---|
517 | encoded = jsonbytes.dumps_bytes(x) |
---|
518 | self.assertIsInstance(encoded, bytes) |
---|
519 | self.assertEqual(json.loads(encoded), x) |
---|
520 | |
---|
521 | def test_any_bytes_unsupported_by_default(self): |
---|
522 | """By default non-UTF-8 bytes raise error.""" |
---|
523 | bytestring = b"abc\xff\x00" |
---|
524 | with self.assertRaises(UnicodeDecodeError): |
---|
525 | jsonbytes.dumps(bytestring) |
---|
526 | with self.assertRaises(UnicodeDecodeError): |
---|
527 | jsonbytes.dumps_bytes(bytestring) |
---|
528 | with self.assertRaises(UnicodeDecodeError): |
---|
529 | json.dumps(bytestring, cls=jsonbytes.UTF8BytesJSONEncoder) |
---|
530 | |
---|
531 | def test_any_bytes(self): |
---|
532 | """If any_bytes is True, non-UTF-8 bytes don't break encoding.""" |
---|
533 | bytestring = b"abc\xff\xff123" |
---|
534 | o = {bytestring: bytestring} |
---|
535 | expected = {"abc\\xff\\xff123": "abc\\xff\\xff123"} |
---|
536 | self.assertEqual( |
---|
537 | json.loads(jsonbytes.dumps(o, any_bytes=True)), |
---|
538 | expected, |
---|
539 | ) |
---|
540 | self.assertEqual( |
---|
541 | json.loads(json.dumps( |
---|
542 | o, cls=jsonbytes.AnyBytesJSONEncoder)), |
---|
543 | expected, |
---|
544 | ) |
---|
545 | self.assertEqual( |
---|
546 | json.loads(jsonbytes.dumps(o, any_bytes=True)), |
---|
547 | expected |
---|
548 | ) |
---|
549 | |
---|
550 | def test_dumps_bytes_unicode_separators(self): |
---|
551 | """Unicode separators don't prevent the result from being bytes.""" |
---|
552 | result = jsonbytes.dumps_bytes([1, 2], separators=(u',', u':')) |
---|
553 | self.assertIsInstance(result, bytes) |
---|
554 | self.assertEqual(result, b"[1,2]") |
---|
555 | |
---|
556 | |
---|
557 | |
---|
558 | class FakeGetVersion(object): |
---|
559 | """Emulate an object with a get_version.""" |
---|
560 | |
---|
561 | def __init__(self, result): |
---|
562 | self.result = result |
---|
563 | |
---|
564 | def remote_get_version(self): |
---|
565 | if isinstance(self.result, Exception): |
---|
566 | raise self.result |
---|
567 | return self.result |
---|
568 | |
---|
569 | |
---|
570 | class RrefUtilTests(unittest.TestCase): |
---|
571 | """Tests for rrefutil.""" |
---|
572 | |
---|
573 | def test_version_returned(self): |
---|
574 | """If get_version() succeeded, it is set on the rref.""" |
---|
575 | rref = LocalWrapper(FakeGetVersion(12345), fireNow) |
---|
576 | result = self.successResultOf( |
---|
577 | rrefutil.add_version_to_remote_reference(rref, "default") |
---|
578 | ) |
---|
579 | self.assertEqual(result.version, 12345) |
---|
580 | self.assertIdentical(result, rref) |
---|
581 | |
---|
582 | def test_exceptions(self): |
---|
583 | """If get_version() failed, default version is set on the rref.""" |
---|
584 | for exception in (Violation(), RemoteException(ValueError())): |
---|
585 | rref = LocalWrapper(FakeGetVersion(exception), fireNow) |
---|
586 | result = self.successResultOf( |
---|
587 | rrefutil.add_version_to_remote_reference(rref, "Default") |
---|
588 | ) |
---|
589 | self.assertEqual(result.version, "Default") |
---|
590 | self.assertIdentical(result, rref) |
---|
591 | |
---|
592 | |
---|
593 | class CPUThreadPool(unittest.TestCase): |
---|
594 | """Tests for cputhreadpool.""" |
---|
595 | |
---|
596 | async def test_runs_in_thread(self): |
---|
597 | """The given function runs in a thread.""" |
---|
598 | def f(*args, **kwargs): |
---|
599 | return current_thread(), args, kwargs |
---|
600 | |
---|
601 | this_thread = current_thread().ident |
---|
602 | thread, args, kwargs = await defer_to_thread(f, 1, 3, key=4, value=5) |
---|
603 | |
---|
604 | # The task ran in a different thread: |
---|
605 | self.assertNotEqual(thread.ident, this_thread) |
---|
606 | self.assertEqual(args, (1, 3)) |
---|
607 | self.assertEqual(kwargs, {"key": 4, "value": 5}) |
---|
608 | |
---|
609 | async def test_when_disabled_runs_in_same_thread(self): |
---|
610 | """ |
---|
611 | If the CPU thread pool is disabled, the given function runs in the |
---|
612 | current thread. |
---|
613 | """ |
---|
614 | disable_thread_pool_for_test(self) |
---|
615 | def f(*args, **kwargs): |
---|
616 | return current_thread().ident, args, kwargs |
---|
617 | |
---|
618 | this_thread = current_thread().ident |
---|
619 | thread, args, kwargs = await defer_to_thread(f, 1, 3, key=4, value=5) |
---|
620 | |
---|
621 | self.assertEqual(thread, this_thread) |
---|
622 | self.assertEqual(args, (1, 3)) |
---|
623 | self.assertEqual(kwargs, {"key": 4, "value": 5}) |
---|