', res), res)
d.addCallback(_check3)
# and an empty directory
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty/"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/empty/"))
def _check4(res):
self.failUnless("directory is empty" in res, res)
MKDIR_BUTTON_RE=re.compile('.*.*', re.I)
@@ -981,7 +1066,7 @@
return d
def test_GET_DIRURL_badtype(self):
- d = self.shouldHTTPError("test_GET_DIRURL_badtype",
+ d = self.shouldHTTPError("GET_DIRURL_badtype",
400, "Bad Request",
"bad t=bogus",
self.GET,
@@ -989,14 +1074,14 @@
return d
def test_GET_DIRURL_json(self):
- d = self.GET(self.public_url + "/foo?t=json")
+ d = self.shouldSucceedGET(self.public_url + "/foo?t=json")
d.addCallback(self.failUnlessIsFooJSON)
return d
def test_POST_DIRURL_manifest_no_ophandle(self):
d = self.shouldFail2(error.Error,
- "test_POST_DIRURL_manifest_no_ophandle",
+ "POST_DIRURL_manifest_no_ophandle",
"400 Bad Request",
"slow operation requires ophandle=",
self.POST, self.public_url, t="start-manifest")
@@ -1005,8 +1090,9 @@
def test_POST_DIRURL_manifest(self):
d = defer.succeed(None)
def getman(ignored, output):
- d = self.POST(self.public_url + "/foo/?t=start-manifest&ophandle=125",
- followRedirect=True)
+ d = self.shouldSucceed("POST_DIRURL_manifest", http.OK, self.POST,
+ self.public_url + "/foo/?t=start-manifest&ophandle=125",
+ followRedirect=True)
d.addCallback(self.wait_for_operation, "125")
d.addCallback(self.get_operation_results, "125", output)
return d
@@ -1019,7 +1105,7 @@
d.addCallback(_got_html)
# both t=status and unadorned GET should be identical
- d.addCallback(lambda res: self.GET("/operations/125"))
+ d.addCallback(lambda res: self.shouldSucceedGET("/operations/125"))
d.addCallback(_got_html)
d.addCallback(getman, "html")
@@ -1047,15 +1133,16 @@
def test_POST_DIRURL_deepsize_no_ophandle(self):
d = self.shouldFail2(error.Error,
- "test_POST_DIRURL_deepsize_no_ophandle",
+ "POST_DIRURL_deepsize_no_ophandle",
"400 Bad Request",
"slow operation requires ophandle=",
self.POST, self.public_url, t="start-deep-size")
return d
def test_POST_DIRURL_deepsize(self):
- d = self.POST(self.public_url + "/foo/?t=start-deep-size&ophandle=126",
- followRedirect=True)
+ d = self.shouldSucceed("POST_DIRURL_deepsize", http.OK, self.POST,
+ self.public_url + "/foo/?t=start-deep-size&ophandle=126",
+ followRedirect=True)
d.addCallback(self.wait_for_operation, "126")
d.addCallback(self.get_operation_results, "126", "json")
def _got_json(data):
@@ -1075,15 +1162,16 @@
def test_POST_DIRURL_deepstats_no_ophandle(self):
d = self.shouldFail2(error.Error,
- "test_POST_DIRURL_deepstats_no_ophandle",
+ "POST_DIRURL_deepstats_no_ophandle",
"400 Bad Request",
"slow operation requires ophandle=",
self.POST, self.public_url, t="start-deep-stats")
return d
def test_POST_DIRURL_deepstats(self):
- d = self.POST(self.public_url + "/foo/?t=start-deep-stats&ophandle=127",
- followRedirect=True)
+ d = self.shouldSucceed("POST_DIRURL_deepstats", http.OK, self.POST,
+ self.public_url + "/foo/?t=start-deep-stats&ophandle=127",
+ followRedirect=True)
d.addCallback(self.wait_for_operation, "127")
d.addCallback(self.get_operation_results, "127", "json")
def _got_json(stats):
@@ -1109,7 +1197,8 @@
return d
def test_POST_DIRURL_stream_manifest(self):
- d = self.POST(self.public_url + "/foo/?t=stream-manifest")
+ d = self.shouldSucceed("POST_DIRURL_stream_manifest", http.OK, self.POST,
+ self.public_url + "/foo/?t=stream-manifest")
def _check(res):
self.failUnless(res.endswith("\n"))
units = [simplejson.loads(t) for t in res[:-1].split("\n")]
@@ -1129,21 +1218,22 @@
return d
def test_GET_DIRURL_uri(self):
- d = self.GET(self.public_url + "/foo?t=uri")
+ d = self.shouldSucceedGET(self.public_url + "/foo?t=uri")
def _check(res):
self.failUnlessEqual(res, self._foo_uri)
d.addCallback(_check)
return d
def test_GET_DIRURL_readonly_uri(self):
- d = self.GET(self.public_url + "/foo?t=readonly-uri")
+ d = self.shouldSucceedGET(self.public_url + "/foo?t=readonly-uri")
def _check(res):
self.failUnlessEqual(res, self._foo_readonly_uri)
d.addCallback(_check)
return d
def test_PUT_NEWDIRURL(self):
- d = self.PUT(self.public_url + "/foo/newdir?t=mkdir", "")
+ d = self.shouldSucceed("PUT_NEWDIRURL", http.OK, self.PUT,
+ self.public_url + "/foo/newdir?t=mkdir", "")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
@@ -1151,7 +1241,8 @@
return d
def test_POST_NEWDIRURL(self):
- d = self.POST2(self.public_url + "/foo/newdir?t=mkdir", "")
+ d = self.shouldSucceed("POST_NEWDIRURL", http.OK, self.POST2,
+ self.public_url + "/foo/newdir?t=mkdir", "")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
@@ -1160,30 +1251,41 @@
def test_POST_NEWDIRURL_emptyname(self):
# an empty pathname component (i.e. a double-slash) is disallowed
- d = self.shouldFail2(error.Error, "test_POST_NEWDIRURL_emptyname",
+ d = self.shouldFail2(error.Error, "POST_NEWDIRURL_emptyname",
"400 Bad Request",
"The webapi does not allow empty pathname components, i.e. a double slash",
self.POST, self.public_url + "//?t=mkdir")
return d
def test_POST_NEWDIRURL_initial_children(self):
- (newkids, filecap1, filecap2, filecap3,
- dircap) = self._create_initial_children()
- d = self.POST2(self.public_url + "/foo/newdir?t=mkdir-with-children",
- simplejson.dumps(newkids))
+ (newkids, caps) = self._create_initial_children()
+ d = self.shouldSucceed("POST_NEWDIRURL_initial_children", http.OK, self.POST2,
+ self.public_url + "/foo/newdir?t=mkdir-with-children",
+ simplejson.dumps(newkids))
def _check(uri):
n = self.s.create_node_from_uri(uri.strip())
d2 = self.failUnlessNodeKeysAre(n, newkids.keys())
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-imm", filecap1))
+ self.failUnlessROChildURIIs(n, u"child-imm",
+ caps['filecap1']))
+ d2.addCallback(lambda ign:
+ self.failUnlessRWChildURIIs(n, u"child-mutable",
+ caps['filecap2']))
+ d2.addCallback(lambda ign:
+ self.failUnlessROChildURIIs(n, u"child-mutable-ro",
+ caps['filecap3']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-mutable",
- filecap2))
+ self.failUnlessROChildURIIs(n, u"unknownchild-ro",
+ caps['unknown_rocap']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-mutable-ro",
- filecap3))
+ self.failUnlessRWChildURIIs(n, u"unknownchild-rw",
+ caps['unknown_rwcap']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"dirchild", dircap))
+ self.failUnlessROChildURIIs(n, u"unknownchild-imm",
+ caps['unknown_immcap']))
+ d2.addCallback(lambda ign:
+ self.failUnlessRWChildURIIs(n, u"dirchild",
+ caps['dircap']))
return d2
d.addCallback(_check)
d.addCallback(lambda res:
@@ -1191,21 +1293,26 @@
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
d.addCallback(self.failUnlessNodeKeysAre, newkids.keys())
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
- d.addCallback(self.failUnlessChildURIIs, u"child-imm", filecap1)
+ d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
return d
def test_POST_NEWDIRURL_immutable(self):
- (newkids, filecap1, immdircap) = self._create_immutable_children()
- d = self.POST2(self.public_url + "/foo/newdir?t=mkdir-immutable",
- simplejson.dumps(newkids))
+ (newkids, caps) = self._create_immutable_children()
+ d = self.shouldSucceed("POST_NEWDIRURL_immutable", http.OK, self.POST2,
+ self.public_url + "/foo/newdir?t=mkdir-immutable",
+ simplejson.dumps(newkids))
def _check(uri):
n = self.s.create_node_from_uri(uri.strip())
d2 = self.failUnlessNodeKeysAre(n, newkids.keys())
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-imm", filecap1))
+ self.failUnlessROChildURIIs(n, u"child-imm",
+ caps['filecap1']))
+ d2.addCallback(lambda ign:
+ self.failUnlessROChildURIIs(n, u"unknownchild-imm",
+ caps['unknown_immcap']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"dirchild-imm",
- immdircap))
+ self.failUnlessROChildURIIs(n, u"dirchild-imm",
+ caps['immdircap']))
return d2
d.addCallback(_check)
d.addCallback(lambda res:
@@ -1213,25 +1320,27 @@
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
d.addCallback(self.failUnlessNodeKeysAre, newkids.keys())
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
- d.addCallback(self.failUnlessChildURIIs, u"child-imm", filecap1)
+ d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
+ d.addCallback(lambda res: self._foo_node.get(u"newdir"))
+ d.addCallback(self.failUnlessROChildURIIs, u"unknownchild-imm", caps['unknown_immcap'])
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
- d.addCallback(self.failUnlessChildURIIs, u"dirchild-imm", immdircap)
+ d.addCallback(self.failUnlessROChildURIIs, u"dirchild-imm", caps['immdircap'])
d.addErrback(self.explain_web_error)
return d
def test_POST_NEWDIRURL_immutable_bad(self):
- (newkids, filecap1, filecap2, filecap3,
- dircap) = self._create_initial_children()
- d = self.shouldFail2(error.Error, "test_POST_NEWDIRURL_immutable_bad",
+ (newkids, caps) = self._create_initial_children()
+ d = self.shouldFail2(error.Error, "POST_NEWDIRURL_immutable_bad",
"400 Bad Request",
- "a mkdir-immutable operation was given a child that was not itself immutable",
+ "needed to be immutable but was not",
self.POST2,
self.public_url + "/foo/newdir?t=mkdir-immutable",
simplejson.dumps(newkids))
return d
def test_PUT_NEWDIRURL_exists(self):
- d = self.PUT(self.public_url + "/foo/sub?t=mkdir", "")
+ d = self.shouldSucceed("PUT_NEWDIRURL_exists", http.OK, self.PUT,
+ self.public_url + "/foo/sub?t=mkdir", "")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"sub"))
d.addCallback(lambda res: self._foo_node.get(u"sub"))
@@ -1249,18 +1358,21 @@
d.addCallback(self.failUnlessNodeKeysAre, [u"baz.txt"])
return d
- def test_PUT_NEWDIRURL_mkdir_p(self):
+ def test_POST_NEWDIRURL_mkdir_p(self):
d = defer.succeed(None)
- d.addCallback(lambda res: self.POST(self.public_url + "/foo", t='mkdir', name='mkp'))
+ d.addCallback(lambda res: self.shouldSucceed("POST_NEWDIRURL_mkdir_p-1", http.OK, self.POST,
+ self.public_url + "/foo", t='mkdir', name='mkp'))
d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"mkp"))
d.addCallback(lambda res: self._foo_node.get(u"mkp"))
def mkdir_p(mkpnode):
url = '/uri/%s?t=mkdir-p&path=/sub1/sub2' % urllib.quote(mkpnode.get_uri())
- d = self.POST(url)
+ d = self.shouldSucceed("POST_NEWDIRURL_mkdir_p-2", http.OK, self.POST,
+ url)
def made_subsub(ssuri):
d = self._foo_node.get_child_at_path(u"mkp/sub1/sub2")
d.addCallback(lambda ssnode: self.failUnlessEqual(ssnode.get_uri(), ssuri))
- d = self.POST(url)
+ d = self.shouldSucceed("POST_NEWDIRURL_mkdir_p-3", http.OK, self.POST,
+ url)
d.addCallback(lambda uri2: self.failUnlessEqual(uri2, ssuri))
return d
d.addCallback(made_subsub)
@@ -1269,7 +1381,8 @@
return d
def test_PUT_NEWDIRURL_mkdirs(self):
- d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir", "")
+ d = self.shouldSucceed("PUT_NEWDIRURL_mkdirs", http.OK, self.PUT,
+ self.public_url + "/foo/subdir/newdir?t=mkdir", "")
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"newdir"))
d.addCallback(lambda res:
@@ -1280,21 +1393,22 @@
return d
def test_DELETE_DIRURL(self):
- d = self.DELETE(self.public_url + "/foo")
+ d = self.shouldSucceed("DELETE_DIRURL", http.OK, self.DELETE,
+ self.public_url + "/foo")
d.addCallback(lambda res:
self.failIfNodeHasChild(self.public_root, u"foo"))
return d
def test_DELETE_DIRURL_missing(self):
d = self.DELETE(self.public_url + "/foo/missing")
- d.addBoth(self.should404, "test_DELETE_DIRURL_missing")
+ d.addBoth(self.should404, "DELETE_DIRURL_missing")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self.public_root, u"foo"))
return d
def test_DELETE_DIRURL_missing2(self):
d = self.DELETE(self.public_url + "/missing")
- d.addBoth(self.should404, "test_DELETE_DIRURL_missing2")
+ d.addBoth(self.should404, "DELETE_DIRURL_missing2")
return d
def dump_root(self):
@@ -1346,18 +1460,44 @@
d.addCallback(_check)
return d
- def failUnlessChildURIIs(self, node, name, expected_uri):
+ def failUnlessRWChildURIIs(self, node, name, expected_uri):
+ assert isinstance(name, unicode)
+ d = node.get_child_at_path(name)
+ def _check(child):
+ self.failUnless(child.is_unknown() or not child.is_readonly())
+ self.failUnlessEqual(child.get_uri(), expected_uri.strip())
+ expected_ro_uri = self._make_readonly(expected_uri)
+ if expected_ro_uri:
+ self.failUnlessEqual(child.get_readonly_uri(), expected_ro_uri.strip())
+ d.addCallback(_check)
+ return d
+
+ def failUnlessROChildURIIs(self, node, name, expected_uri):
assert isinstance(name, unicode)
d = node.get_child_at_path(name)
def _check(child):
+ self.failUnless(child.is_unknown() or child.is_readonly())
self.failUnlessEqual(child.get_uri(), expected_uri.strip())
d.addCallback(_check)
return d
- def failUnlessURIMatchesChild(self, got_uri, node, name):
+ def failUnlessURIMatchesRWChild(self, got_uri, node, name):
+ assert isinstance(name, unicode)
+ d = node.get_child_at_path(name)
+ def _check(child):
+ self.failUnless(child.is_unknown() or not child.is_readonly())
+ self.failUnlessEqual(child.get_uri(), got_uri.strip())
+ expected_ro_uri = self._make_readonly(got_uri)
+ if expected_ro_uri:
+ self.failUnlessEqual(child.get_readonly_uri(), expected_ro_uri.strip())
+ d.addCallback(_check)
+ return d
+
+ def failUnlessURIMatchesROChild(self, got_uri, node, name):
assert isinstance(name, unicode)
d = node.get_child_at_path(name)
def _check(child):
+ self.failUnless(child.is_unknown() or child.is_readonly())
self.failUnlessEqual(got_uri.strip(), child.get_uri())
d.addCallback(_check)
return d
@@ -1366,10 +1506,11 @@
self.failUnless(FakeCHKFileNode.all_contents[got_uri] == contents)
def test_POST_upload(self):
- d = self.POST(self.public_url + "/foo", t="upload",
- file=("new.txt", self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload", http.OK, self.POST,
+ self.public_url + "/foo", t="upload",
+ file=("new.txt", self.NEWFILE_CONTENTS))
fn = self._foo_node
- d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
+ d.addCallback(self.failUnlessURIMatchesROChild, fn, u"new.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(fn, u"new.txt",
self.NEWFILE_CONTENTS))
@@ -1377,15 +1518,16 @@
def test_POST_upload_unicode(self):
filename = u"n\u00e9wer.txt" # n e-acute w e r . t x t
- d = self.POST(self.public_url + "/foo", t="upload",
- file=(filename, self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload_unicode", http.OK, self.POST,
+ self.public_url + "/foo", t="upload",
+ file=(filename, self.NEWFILE_CONTENTS))
fn = self._foo_node
- d.addCallback(self.failUnlessURIMatchesChild, fn, filename)
+ d.addCallback(self.failUnlessURIMatchesROChild, fn, filename)
d.addCallback(lambda res:
self.failUnlessChildContentsAre(fn, filename,
self.NEWFILE_CONTENTS))
target_url = self.public_url + "/foo/" + filename.encode("utf-8")
- d.addCallback(lambda res: self.GET(target_url))
+ d.addCallback(lambda res: self.shouldSucceedGET(target_url))
d.addCallback(lambda contents: self.failUnlessEqual(contents,
self.NEWFILE_CONTENTS,
contents))
@@ -1393,24 +1535,26 @@
def test_POST_upload_unicode_named(self):
filename = u"n\u00e9wer.txt" # n e-acute w e r . t x t
- d = self.POST(self.public_url + "/foo", t="upload",
- name=filename,
- file=("overridden", self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload_unicode_named", http.OK, self.POST,
+ self.public_url + "/foo", t="upload",
+ name=filename,
+ file=("overridden", self.NEWFILE_CONTENTS))
fn = self._foo_node
- d.addCallback(self.failUnlessURIMatchesChild, fn, filename)
+ d.addCallback(self.failUnlessURIMatchesROChild, fn, filename)
d.addCallback(lambda res:
self.failUnlessChildContentsAre(fn, filename,
self.NEWFILE_CONTENTS))
target_url = self.public_url + "/foo/" + filename.encode("utf-8")
- d.addCallback(lambda res: self.GET(target_url))
+ d.addCallback(lambda res: self.shouldSucceedGET(target_url))
d.addCallback(lambda contents: self.failUnlessEqual(contents,
self.NEWFILE_CONTENTS,
contents))
return d
def test_POST_upload_no_link(self):
- d = self.POST("/uri", t="upload",
- file=("new.txt", self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload_no_link", http.OK, self.POST,
+ "/uri", t="upload",
+ file=("new.txt", self.NEWFILE_CONTENTS))
def _check_upload_results(page):
# this should be a page which describes the results of the upload
# that just finished.
@@ -1449,7 +1593,7 @@
self.failUnlessEqual(statuscode, str(http.FOUND))
self.failUnless(target.startswith(self.webish_url), target)
return client.getPage(target, method="GET")
- d = self.shouldRedirect2("test_POST_upload_no_link_whendone_results",
+ d = self.shouldRedirect2("POST_upload_no_link_whendone_results",
check,
self.POST, "/uri", t="upload",
when_done="/uri/%(uri)s",
@@ -1459,8 +1603,9 @@
return d
def test_POST_upload_no_link_mutable(self):
- d = self.POST("/uri", t="upload", mutable="true",
- file=("new.txt", self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload_no_link_mutable", http.OK, self.POST,
+ "/uri", t="upload", mutable="true",
+ file=("new.txt", self.NEWFILE_CONTENTS))
def _check(filecap):
filecap = filecap.strip()
self.failUnless(filecap.startswith("URI:SSK:"), filecap)
@@ -1472,11 +1617,11 @@
d.addCallback(_check)
def _check2(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
- return self.GET("/uri/%s" % urllib.quote(self.filecap))
+ return self.shouldSucceedGET("/uri/%s" % urllib.quote(self.filecap))
d.addCallback(_check2)
def _check3(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
- return self.GET("/file/%s" % urllib.quote(self.filecap))
+ return self.shouldSucceedGET("/file/%s" % urllib.quote(self.filecap))
d.addCallback(_check3)
def _check4(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
@@ -1485,7 +1630,7 @@
def test_POST_upload_no_link_mutable_toobig(self):
d = self.shouldFail2(error.Error,
- "test_POST_upload_no_link_mutable_toobig",
+ "POST_upload_no_link_mutable_toobig",
"413 Request Entity Too Large",
"SDMF is limited to one segment, and 10001 > 10000",
self.POST,
@@ -1496,10 +1641,11 @@
def test_POST_upload_mutable(self):
# this creates a mutable file
- d = self.POST(self.public_url + "/foo", t="upload", mutable="true",
- file=("new.txt", self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload_mutable", http.OK, self.POST,
+ self.public_url + "/foo", t="upload", mutable="true",
+ file=("new.txt", self.NEWFILE_CONTENTS))
fn = self._foo_node
- d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
+ d.addCallback(self.failUnlessURIMatchesRWChild, fn, u"new.txt")
d.addCallback(lambda res:
self.failUnlessMutableChildContentsAre(fn, u"new.txt",
self.NEWFILE_CONTENTS))
@@ -1515,10 +1661,11 @@
# now upload it again and make sure that the URI doesn't change
NEWER_CONTENTS = self.NEWFILE_CONTENTS + "newer\n"
d.addCallback(lambda res:
- self.POST(self.public_url + "/foo", t="upload",
- mutable="true",
- file=("new.txt", NEWER_CONTENTS)))
- d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
+ self.shouldSucceed("POST_upload_mutable-again", http.OK, self.POST,
+ self.public_url + "/foo", t="upload",
+ mutable="true",
+ file=("new.txt", NEWER_CONTENTS)))
+ d.addCallback(self.failUnlessURIMatchesRWChild, fn, u"new.txt")
d.addCallback(lambda res:
self.failUnlessMutableChildContentsAre(fn, u"new.txt",
NEWER_CONTENTS))
@@ -1533,8 +1680,9 @@
# upload a second time, using PUT instead of POST
NEW2_CONTENTS = NEWER_CONTENTS + "overwrite with PUT\n"
d.addCallback(lambda res:
- self.PUT(self.public_url + "/foo/new.txt", NEW2_CONTENTS))
- d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
+ self.shouldSucceed("POST_upload_mutable-again-with-PUT", http.OK, self.PUT,
+ self.public_url + "/foo/new.txt", NEW2_CONTENTS))
+ d.addCallback(self.failUnlessURIMatchesRWChild, fn, u"new.txt")
d.addCallback(lambda res:
self.failUnlessMutableChildContentsAre(fn, u"new.txt",
NEW2_CONTENTS))
@@ -1543,8 +1691,8 @@
# slightly differently
d.addCallback(lambda res:
- self.GET(self.public_url + "/foo/",
- followRedirect=True))
+ self.shouldSucceedGET(self.public_url + "/foo/",
+ followRedirect=True))
def _check_page(res):
# TODO: assert more about the contents
self.failUnless("SSK" in res)
@@ -1561,8 +1709,8 @@
# look at the JSON form of the enclosing directory
d.addCallback(lambda res:
- self.GET(self.public_url + "/foo/?t=json",
- followRedirect=True))
+ self.shouldSucceedGET(self.public_url + "/foo/?t=json",
+ followRedirect=True))
def _check_page_json(res):
parsed = simplejson.loads(res)
self.failUnlessEqual(parsed[0], "dirnode")
@@ -1580,7 +1728,7 @@
# and the JSON form of the file
d.addCallback(lambda res:
- self.GET(self.public_url + "/foo/new.txt?t=json"))
+ self.shouldSucceedGET(self.public_url + "/foo/new.txt?t=json"))
def _check_file_json(res):
parsed = simplejson.loads(res)
self.failUnlessEqual(parsed[0], "filenode")
@@ -1592,10 +1740,10 @@
# and look at t=uri and t=readonly-uri
d.addCallback(lambda res:
- self.GET(self.public_url + "/foo/new.txt?t=uri"))
+ self.shouldSucceedGET(self.public_url + "/foo/new.txt?t=uri"))
d.addCallback(lambda res: self.failUnlessEqual(res, self._mutable_uri))
d.addCallback(lambda res:
- self.GET(self.public_url + "/foo/new.txt?t=readonly-uri"))
+ self.shouldSucceedGET(self.public_url + "/foo/new.txt?t=readonly-uri"))
def _check_ro_uri(res):
ro_uri = unicode(self._mutable_node.get_readonly().to_string())
self.failUnlessEqual(res, ro_uri)
@@ -1603,15 +1751,15 @@
# make sure we can get to it from /uri/URI
d.addCallback(lambda res:
- self.GET("/uri/%s" % urllib.quote(self._mutable_uri)))
+ self.shouldSucceedGET("/uri/%s" % urllib.quote(self._mutable_uri)))
d.addCallback(lambda res:
self.failUnlessEqual(res, NEW2_CONTENTS))
# and that HEAD computes the size correctly
d.addCallback(lambda res:
- self.HEAD(self.public_url + "/foo/new.txt",
- return_response=True))
- def _got_headers((res, status, headers)):
+ self.shouldSucceedHEAD(self.public_url + "/foo/new.txt",
+ return_response=True))
+ def _got_headers((res, statuscode, headers)):
self.failUnlessEqual(res, "")
self.failUnlessEqual(headers["content-length"][0],
str(len(NEW2_CONTENTS)))
@@ -1621,7 +1769,7 @@
# make sure that size errors are displayed correctly for overwrite
d.addCallback(lambda res:
self.shouldFail2(error.Error,
- "test_POST_upload_mutable-toobig",
+ "POST_upload_mutable-toobig",
"413 Request Entity Too Large",
"SDMF is limited to one segment, and 10001 > 10000",
self.POST,
@@ -1636,7 +1784,7 @@
def test_POST_upload_mutable_toobig(self):
d = self.shouldFail2(error.Error,
- "test_POST_upload_mutable_toobig",
+ "POST_upload_mutable_toobig",
"413 Request Entity Too Large",
"SDMF is limited to one segment, and 10001 > 10000",
self.POST,
@@ -1660,19 +1808,21 @@
return f
def test_POST_upload_replace(self):
- d = self.POST(self.public_url + "/foo", t="upload",
- file=("bar.txt", self.NEWFILE_CONTENTS))
+ d = self.shouldSucceed("POST_upload_replace", http.OK, self.POST,
+ self.public_url + "/foo", t="upload",
+ file=("bar.txt", self.NEWFILE_CONTENTS))
fn = self._foo_node
- d.addCallback(self.failUnlessURIMatchesChild, fn, u"bar.txt")
+ d.addCallback(self.failUnlessURIMatchesROChild, fn, u"bar.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(fn, u"bar.txt",
self.NEWFILE_CONTENTS))
return d
def test_POST_upload_no_replace_ok(self):
- d = self.POST(self.public_url + "/foo?replace=false", t="upload",
- file=("new.txt", self.NEWFILE_CONTENTS))
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/new.txt"))
+ d = self.shouldSucceed("POST_upload_no_replace_ok", http.OK, self.POST,
+ self.public_url + "/foo?replace=false", t="upload",
+ file=("new.txt", self.NEWFILE_CONTENTS))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/new.txt"))
d.addCallback(lambda res: self.failUnlessEqual(res,
self.NEWFILE_CONTENTS))
return d
@@ -1685,7 +1835,7 @@
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
@@ -1696,7 +1846,7 @@
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
@@ -1712,9 +1862,10 @@
def test_POST_upload_named(self):
fn = self._foo_node
- d = self.POST(self.public_url + "/foo", t="upload",
- name="new.txt", file=self.NEWFILE_CONTENTS)
- d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
+ d = self.shouldSucceed("POST_upload_named", http.OK, self.POST,
+ self.public_url + "/foo", t="upload",
+ name="new.txt", file=self.NEWFILE_CONTENTS)
+ d.addCallback(self.failUnlessURIMatchesROChild, fn, u"new.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(fn, u"new.txt",
self.NEWFILE_CONTENTS))
@@ -1724,7 +1875,7 @@
d = self.POST(self.public_url + "/foo", t="upload",
name="slashes/are/bad.txt", file=self.NEWFILE_CONTENTS)
d.addBoth(self.shouldFail, error.Error,
- "test_POST_upload_named_badfilename",
+ "POST_upload_named_badfilename",
"400 Bad Request",
"name= may not contain a slash",
)
@@ -1738,7 +1889,8 @@
def test_POST_FILEURL_check(self):
bar_url = self.public_url + "/foo/bar.txt"
- d = self.POST(bar_url, t="check")
+ d = self.shouldSucceed("POST_FILEURL_check-1", http.OK, self.POST,
+ bar_url, t="check")
def _check(res):
self.failUnless("Healthy :" in res)
d.addCallback(_check)
@@ -1747,13 +1899,14 @@
self.failUnlessEqual(statuscode, str(http.FOUND))
self.failUnlessEqual(target, redir_url)
d.addCallback(lambda res:
- self.shouldRedirect2("test_POST_FILEURL_check",
+ self.shouldRedirect2("POST_FILEURL_check-2",
_check2,
self.POST, bar_url,
t="check",
when_done=redir_url))
d.addCallback(lambda res:
- self.POST(bar_url, t="check", return_to=redir_url))
+ self.shouldSucceed("POST_FILEURL_check-3", http.OK, self.POST,
+ bar_url, t="check", return_to=redir_url))
def _check3(res):
self.failUnless("Healthy :" in res)
self.failUnless("Return to file" in res)
@@ -1761,7 +1914,8 @@
d.addCallback(_check3)
d.addCallback(lambda res:
- self.POST(bar_url, t="check", output="JSON"))
+ self.shouldSucceed("POST_FILEURL_check-4", http.OK, self.POST,
+ bar_url, t="check", output="JSON"))
def _check_json(res):
data = simplejson.loads(res)
self.failUnless("storage-index" in data)
@@ -1772,7 +1926,8 @@
def test_POST_FILEURL_check_and_repair(self):
bar_url = self.public_url + "/foo/bar.txt"
- d = self.POST(bar_url, t="check", repair="true")
+ d = self.shouldSucceed("POST_FILEURL_check_and_repair-1", http.OK, self.POST,
+ bar_url, t="check", repair="true")
def _check(res):
self.failUnless("Healthy :" in res)
d.addCallback(_check)
@@ -1781,13 +1936,14 @@
self.failUnlessEqual(statuscode, str(http.FOUND))
self.failUnlessEqual(target, redir_url)
d.addCallback(lambda res:
- self.shouldRedirect2("test_POST_FILEURL_check_and_repair",
+ self.shouldRedirect2("POST_FILEURL_check_and_repair-2",
_check2,
self.POST, bar_url,
t="check", repair="true",
when_done=redir_url))
d.addCallback(lambda res:
- self.POST(bar_url, t="check", return_to=redir_url))
+ self.shouldSucceed("POST_FILEURL_check_and_repair-3", http.OK, self.POST,
+ bar_url, t="check", return_to=redir_url))
def _check3(res):
self.failUnless("Healthy :" in res)
self.failUnless("Return to file" in res)
@@ -1797,7 +1953,8 @@
def test_POST_DIRURL_check(self):
foo_url = self.public_url + "/foo/"
- d = self.POST(foo_url, t="check")
+ d = self.shouldSucceed("POST_DIRURL_check-1", http.OK, self.POST,
+ foo_url, t="check")
def _check(res):
self.failUnless("Healthy :" in res, res)
d.addCallback(_check)
@@ -1806,13 +1963,14 @@
self.failUnlessEqual(statuscode, str(http.FOUND))
self.failUnlessEqual(target, redir_url)
d.addCallback(lambda res:
- self.shouldRedirect2("test_POST_DIRURL_check",
+ self.shouldRedirect2("POST_DIRURL_check-2",
_check2,
self.POST, foo_url,
t="check",
when_done=redir_url))
d.addCallback(lambda res:
- self.POST(foo_url, t="check", return_to=redir_url))
+ self.shouldSucceed("POST_DIRURL_check-3", http.OK, self.POST,
+ foo_url, t="check", return_to=redir_url))
def _check3(res):
self.failUnless("Healthy :" in res, res)
self.failUnless("Return to file/directory" in res)
@@ -1820,7 +1978,8 @@
d.addCallback(_check3)
d.addCallback(lambda res:
- self.POST(foo_url, t="check", output="JSON"))
+ self.shouldSucceed("POST_DIRURL_check-4", http.OK, self.POST,
+ foo_url, t="check", output="JSON"))
def _check_json(res):
data = simplejson.loads(res)
self.failUnless("storage-index" in data)
@@ -1831,7 +1990,8 @@
def test_POST_DIRURL_check_and_repair(self):
foo_url = self.public_url + "/foo/"
- d = self.POST(foo_url, t="check", repair="true")
+ d = self.shouldSucceed("POST_DIRURL_check_and_repair-1", http.OK, self.POST,
+ foo_url, t="check", repair="true")
def _check(res):
self.failUnless("Healthy :" in res, res)
d.addCallback(_check)
@@ -1840,13 +2000,14 @@
self.failUnlessEqual(statuscode, str(http.FOUND))
self.failUnlessEqual(target, redir_url)
d.addCallback(lambda res:
- self.shouldRedirect2("test_POST_DIRURL_check_and_repair",
+ self.shouldRedirect2("POST_DIRURL_check_and_repair-2",
_check2,
self.POST, foo_url,
t="check", repair="true",
when_done=redir_url))
d.addCallback(lambda res:
- self.POST(foo_url, t="check", return_to=redir_url))
+ self.shouldSucceed("POST_DIRURL_check_and_repair-3", http.OK, self.POST,
+ foo_url, t="check", return_to=redir_url))
def _check3(res):
self.failUnless("Healthy :" in res)
self.failUnless("Return to file/directory" in res)
@@ -1857,7 +2018,7 @@
def wait_for_operation(self, ignored, ophandle):
url = "/operations/" + ophandle
url += "?t=status&output=JSON"
- d = self.GET(url)
+ d = self.shouldSucceedGET(url)
def _got(res):
data = simplejson.loads(res)
if not data["finished"]:
@@ -1873,7 +2034,7 @@
url += "?t=status"
if output:
url += "&output=" + output
- d = self.GET(url)
+ d = self.shouldSucceedGET(url)
def _got(res):
if output and output.lower() == "json":
return simplejson.loads(res)
@@ -1883,7 +2044,7 @@
def test_POST_DIRURL_deepcheck_no_ophandle(self):
d = self.shouldFail2(error.Error,
- "test_POST_DIRURL_deepcheck_no_ophandle",
+ "POST_DIRURL_deepcheck_no_ophandle",
"400 Bad Request",
"slow operation requires ophandle=",
self.POST, self.public_url, t="start-deep-check")
@@ -1893,7 +2054,7 @@
def _check_redirect(statuscode, target):
self.failUnlessEqual(statuscode, str(http.FOUND))
self.failUnless(target.endswith("/operations/123"))
- d = self.shouldRedirect2("test_POST_DIRURL_deepcheck", _check_redirect,
+ d = self.shouldRedirect2("POST_DIRURL_deepcheck", _check_redirect,
self.POST, self.public_url,
t="start-deep-check", ophandle="123")
d.addCallback(self.wait_for_operation, "123")
@@ -1909,7 +2070,7 @@
d.addCallback(_check_html)
d.addCallback(lambda res:
- self.GET("/operations/123/"))
+ self.shouldSucceedGET("/operations/123/"))
d.addCallback(_check_html) # should be the same as without the slash
d.addCallback(lambda res:
@@ -1920,7 +2081,7 @@
foo_si = self._foo_node.get_storage_index()
foo_si_s = base32.b2a(foo_si)
d.addCallback(lambda res:
- self.GET("/operations/123/%s?output=JSON" % foo_si_s))
+ self.shouldSucceedGET("/operations/123/%s?output=JSON" % foo_si_s))
def _check_foo_json(res):
data = simplejson.loads(res)
self.failUnlessEqual(data["storage-index"], foo_si_s)
@@ -1929,8 +2090,9 @@
return d
def test_POST_DIRURL_deepcheck_and_repair(self):
- d = self.POST(self.public_url, t="start-deep-check", repair="true",
- ophandle="124", output="json", followRedirect=True)
+ d = self.shouldSucceed("POST_DIRURL_deepcheck_and_repair", http.OK, self.POST,
+ self.public_url, t="start-deep-check", repair="true",
+ ophandle="124", output="json", followRedirect=True)
d.addCallback(self.wait_for_operation, "124")
def _check_json(data):
self.failUnlessEqual(data["finished"], True)
@@ -1971,45 +2133,47 @@
return d
def test_POST_mkdir(self): # return value?
- d = self.POST(self.public_url + "/foo", t="mkdir", name="newdir")
+ d = self.shouldSucceed("POST_mkdir", http.OK, self.POST,
+ self.public_url + "/foo", t="mkdir", name="newdir")
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
d.addCallback(self.failUnlessNodeKeysAre, [])
return d
def test_POST_mkdir_initial_children(self):
- newkids, filecap1, ign, ign, ign = self._create_initial_children()
- d = self.POST2(self.public_url +
- "/foo?t=mkdir-with-children&name=newdir",
- simplejson.dumps(newkids))
+ (newkids, caps) = self._create_initial_children()
+ d = self.shouldSucceed("POST_mkdir_initial_children", http.OK, self.POST2,
+ self.public_url + "/foo?t=mkdir-with-children&name=newdir",
+ simplejson.dumps(newkids))
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
d.addCallback(self.failUnlessNodeKeysAre, newkids.keys())
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
- d.addCallback(self.failUnlessChildURIIs, u"child-imm", filecap1)
+ d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
return d
def test_POST_mkdir_immutable(self):
- (newkids, filecap1, immdircap) = self._create_immutable_children()
- d = self.POST2(self.public_url +
- "/foo?t=mkdir-immutable&name=newdir",
- simplejson.dumps(newkids))
+ (newkids, caps) = self._create_immutable_children()
+ d = self.shouldSucceed("POST_mkdir_immutable", http.OK, self.POST2,
+ self.public_url + "/foo?t=mkdir-immutable&name=newdir",
+ simplejson.dumps(newkids))
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
d.addCallback(self.failUnlessNodeKeysAre, newkids.keys())
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
- d.addCallback(self.failUnlessChildURIIs, u"child-imm", filecap1)
+ d.addCallback(self.failUnlessROChildURIIs, u"child-imm", caps['filecap1'])
+ d.addCallback(lambda res: self._foo_node.get(u"newdir"))
+ d.addCallback(self.failUnlessROChildURIIs, u"unknownchild-imm", caps['unknown_immcap'])
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
- d.addCallback(self.failUnlessChildURIIs, u"dirchild-imm", immdircap)
+ d.addCallback(self.failUnlessROChildURIIs, u"dirchild-imm", caps['immdircap'])
return d
def test_POST_mkdir_immutable_bad(self):
- (newkids, filecap1, filecap2, filecap3,
- dircap) = self._create_initial_children()
- d = self.shouldFail2(error.Error, "test_POST_mkdir_immutable_bad",
+ (newkids, caps) = self._create_initial_children()
+ d = self.shouldFail2(error.Error, "POST_mkdir_immutable_bad",
"400 Bad Request",
- "a mkdir-immutable operation was given a child that was not itself immutable",
+ "needed to be immutable but was not",
self.POST2,
self.public_url +
"/foo?t=mkdir-immutable&name=newdir",
@@ -2017,7 +2181,8 @@
return d
def test_POST_mkdir_2(self):
- d = self.POST(self.public_url + "/foo/newdir?t=mkdir", "")
+ d = self.shouldSucceed("POST_mkdir_2", http.OK, self.POST,
+ self.public_url + "/foo/newdir?t=mkdir", "")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"newdir"))
d.addCallback(lambda res: self._foo_node.get(u"newdir"))
@@ -2025,7 +2190,8 @@
return d
def test_POST_mkdirs_2(self):
- d = self.POST(self.public_url + "/foo/bardir/newdir?t=mkdir", "")
+ d = self.shouldSucceed("POST_mkdirs_2", http.OK, self.POST,
+ self.public_url + "/foo/bardir/newdir?t=mkdir", "")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"bardir"))
d.addCallback(lambda res: self._foo_node.get(u"bardir"))
@@ -2034,7 +2200,8 @@
return d
def test_POST_mkdir_no_parentdir_noredirect(self):
- d = self.POST("/uri?t=mkdir")
+ d = self.shouldSucceed("POST_mkdir_no_parentdir_noredirect", http.OK, self.POST,
+ "/uri?t=mkdir")
def _after_mkdir(res):
uri.DirectoryURI.init_from_string(res)
d.addCallback(_after_mkdir)
@@ -2049,21 +2216,43 @@
d.addCallback(_check_target)
return d
+ def _make_readonly(self, u):
+ ro_uri = uri.from_string(u).get_readonly()
+ if ro_uri is None:
+ return None
+ return ro_uri.to_string()
+
def _create_initial_children(self):
contents, n, filecap1 = self.makefile(12)
md1 = {"metakey1": "metavalue1"}
filecap2 = make_mutable_file_uri()
node3 = self.s.create_node_from_uri(make_mutable_file_uri())
filecap3 = node3.get_readonly_uri()
+ unknown_rwcap = "lafs://from_the_future"
+ unknown_rocap = "ro.lafs://readonly_from_the_future"
+ unknown_immcap = "imm.lafs://immutable_from_the_future"
node4 = self.s.create_node_from_uri(make_mutable_file_uri())
dircap = DirectoryNode(node4, None, None).get_uri()
- newkids = {u"child-imm": ["filenode", {"ro_uri": filecap1,
- "metadata": md1, }],
- u"child-mutable": ["filenode", {"rw_uri": filecap2}],
+ newkids = {u"child-imm": ["filenode", {"rw_uri": filecap1,
+ "ro_uri": self._make_readonly(filecap1),
+ "metadata": md1, }],
+ u"child-mutable": ["filenode", {"rw_uri": filecap2,
+ "ro_uri": self._make_readonly(filecap2)}],
u"child-mutable-ro": ["filenode", {"ro_uri": filecap3}],
- u"dirchild": ["dirnode", {"rw_uri": dircap}],
+ u"unknownchild-rw": ["unknown", {"rw_uri": unknown_rwcap,
+ "ro_uri": unknown_rocap}],
+ u"unknownchild-ro": ["unknown", {"ro_uri": unknown_rocap}],
+ u"unknownchild-imm": ["unknown", {"ro_uri": unknown_immcap}],
+ u"dirchild": ["dirnode", {"rw_uri": dircap,
+ "ro_uri": self._make_readonly(dircap)}],
}
- return newkids, filecap1, filecap2, filecap3, dircap
+ return newkids, {'filecap1': filecap1,
+ 'filecap2': filecap2,
+ 'filecap3': filecap3,
+ 'unknown_rwcap': unknown_rwcap,
+ 'unknown_rocap': unknown_rocap,
+ 'unknown_immcap': unknown_immcap,
+ 'dircap': dircap}
def _create_immutable_children(self):
contents, n, filecap1 = self.makefile(12)
@@ -2071,31 +2260,46 @@
tnode = create_chk_filenode("immutable directory contents\n"*10)
dnode = DirectoryNode(tnode, None, None)
assert not dnode.is_mutable()
+ unknown_immcap = "imm.lafs://immutable_from_the_future"
immdircap = dnode.get_uri()
- newkids = {u"child-imm": ["filenode", {"ro_uri": filecap1,
- "metadata": md1, }],
- u"dirchild-imm": ["dirnode", {"ro_uri": immdircap}],
+ newkids = {u"child-imm": ["filenode", {"ro_uri": filecap1,
+ "metadata": md1, }],
+ u"unknownchild-imm": ["unknown", {"ro_uri": unknown_immcap}],
+ u"dirchild-imm": ["dirnode", {"ro_uri": immdircap}],
}
- return newkids, filecap1, immdircap
+ return newkids, {'filecap1': filecap1,
+ 'unknown_immcap': unknown_immcap,
+ 'immdircap': immdircap}
def test_POST_mkdir_no_parentdir_initial_children(self):
- (newkids, filecap1, filecap2, filecap3,
- dircap) = self._create_initial_children()
- d = self.POST2("/uri?t=mkdir-with-children", simplejson.dumps(newkids))
+ (newkids, caps) = self._create_initial_children()
+ d = self.shouldSucceed("POST_mkdir_no_parentdir_initial_children", http.OK, self.POST2,
+ "/uri?t=mkdir-with-children", simplejson.dumps(newkids))
def _after_mkdir(res):
self.failUnless(res.startswith("URI:DIR"), res)
n = self.s.create_node_from_uri(res)
d2 = self.failUnlessNodeKeysAre(n, newkids.keys())
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-imm", filecap1))
+ self.failUnlessROChildURIIs(n, u"child-imm",
+ caps['filecap1']))
+ d2.addCallback(lambda ign:
+ self.failUnlessRWChildURIIs(n, u"child-mutable",
+ caps['filecap2']))
+ d2.addCallback(lambda ign:
+ self.failUnlessROChildURIIs(n, u"child-mutable-ro",
+ caps['filecap3']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-mutable",
- filecap2))
+ self.failUnlessRWChildURIIs(n, u"unknownchild-rw",
+ caps['unknown_rwcap']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-mutable-ro",
- filecap3))
+ self.failUnlessROChildURIIs(n, u"unknownchild-ro",
+ caps['unknown_rocap']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"dirchild", dircap))
+ self.failUnlessROChildURIIs(n, u"unknownchild-imm",
+ caps['unknown_immcap']))
+ d2.addCallback(lambda ign:
+ self.failUnlessRWChildURIIs(n, u"dirchild",
+ caps['dircap']))
return d2
d.addCallback(_after_mkdir)
return d
@@ -2103,8 +2307,7 @@
def test_POST_mkdir_no_parentdir_unexpected_children(self):
# the regular /uri?t=mkdir operation is specified to ignore its body.
# Only t=mkdir-with-children pays attention to it.
- (newkids, filecap1, filecap2, filecap3,
- dircap) = self._create_initial_children()
+ (newkids, caps) = self._create_initial_children()
d = self.shouldHTTPError("POST t=mkdir unexpected children",
400, "Bad Request",
"t=mkdir does not accept children=, "
@@ -2121,28 +2324,32 @@
return d
def test_POST_mkdir_no_parentdir_immutable(self):
- (newkids, filecap1, immdircap) = self._create_immutable_children()
- d = self.POST2("/uri?t=mkdir-immutable", simplejson.dumps(newkids))
+ (newkids, caps) = self._create_immutable_children()
+ d = self.shouldSucceed("POST_mkdir_no_parentdir_immutable", http.OK, self.POST2,
+ "/uri?t=mkdir-immutable", simplejson.dumps(newkids))
def _after_mkdir(res):
self.failUnless(res.startswith("URI:DIR"), res)
n = self.s.create_node_from_uri(res)
d2 = self.failUnlessNodeKeysAre(n, newkids.keys())
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"child-imm", filecap1))
+ self.failUnlessROChildURIIs(n, u"child-imm",
+ caps['filecap1']))
+ d2.addCallback(lambda ign:
+ self.failUnlessROChildURIIs(n, u"unknownchild-imm",
+ caps['unknown_immcap']))
d2.addCallback(lambda ign:
- self.failUnlessChildURIIs(n, u"dirchild-imm",
- immdircap))
+ self.failUnlessROChildURIIs(n, u"dirchild-imm",
+ caps['immdircap']))
return d2
d.addCallback(_after_mkdir)
return d
def test_POST_mkdir_no_parentdir_immutable_bad(self):
- (newkids, filecap1, filecap2, filecap3,
- dircap) = self._create_initial_children()
+ (newkids, caps) = self._create_initial_children()
d = self.shouldFail2(error.Error,
- "test_POST_mkdir_no_parentdir_immutable_bad",
+ "POST_mkdir_no_parentdir_immutable_bad",
"400 Bad Request",
- "a mkdir-immutable operation was given a child that was not itself immutable",
+ "needed to be immutable but was not",
self.POST2,
"/uri?t=mkdir-immutable",
simplejson.dumps(newkids))
@@ -2150,9 +2357,14 @@
def test_welcome_page_mkdir_button(self):
# Fetch the welcome page.
- d = self.GET("/")
+ d = self.shouldSucceedGET("/")
def _after_get_welcome_page(res):
- MKDIR_BUTTON_RE=re.compile('', re.I)
+ MKDIR_BUTTON_RE = re.compile(
+ ''
+ ''
+ '',
+ re.I)
mo = MKDIR_BUTTON_RE.search(res)
formaction = mo.group(1)
formt = mo.group(2)
@@ -2168,7 +2380,8 @@
return d
def test_POST_mkdir_replace(self): # return value?
- d = self.POST(self.public_url + "/foo", t="mkdir", name="sub")
+ d = self.shouldSucceed("POST_mkdir_replace", http.OK, self.POST,
+ self.public_url + "/foo", t="mkdir", name="sub")
d.addCallback(lambda res: self._foo_node.get(u"sub"))
d.addCallback(self.failUnlessNodeKeysAre, [])
return d
@@ -2250,9 +2463,9 @@
d = client.getPage(url, method="POST", postdata=reqbody)
def _then(res):
- self.failUnlessURIMatchesChild(newuri9, self._foo_node, u"atomic_added_1")
- self.failUnlessURIMatchesChild(newuri10, self._foo_node, u"atomic_added_2")
- self.failUnlessURIMatchesChild(newuri11, self._foo_node, u"atomic_added_3")
+ self.failUnlessURIMatchesROChild(newuri9, self._foo_node, u"atomic_added_1")
+ self.failUnlessURIMatchesROChild(newuri10, self._foo_node, u"atomic_added_2")
+ self.failUnlessURIMatchesROChild(newuri11, self._foo_node, u"atomic_added_3")
d.addCallback(_then)
d.addErrback(self.dump_error)
@@ -2260,8 +2473,9 @@
def test_POST_put_uri(self):
contents, n, newuri = self.makefile(8)
- d = self.POST(self.public_url + "/foo", t="uri", name="new.txt", uri=newuri)
- d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, u"new.txt")
+ d = self.shouldSucceed("POST_put_uri", http.OK, self.POST,
+ self.public_url + "/foo", t="uri", name="new.txt", uri=newuri)
+ d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"new.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
contents))
@@ -2269,8 +2483,9 @@
def test_POST_put_uri_replace(self):
contents, n, newuri = self.makefile(8)
- d = self.POST(self.public_url + "/foo", t="uri", name="bar.txt", uri=newuri)
- d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, u"bar.txt")
+ d = self.shouldSucceed("POST_put_uri_replace", http.OK, self.POST,
+ self.public_url + "/foo", t="uri", name="bar.txt", uri=newuri)
+ d.addCallback(self.failUnlessURIMatchesROChild, self._foo_node, u"bar.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(self._foo_node, u"bar.txt",
contents))
@@ -2285,7 +2500,7 @@
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
@@ -2298,12 +2513,13 @@
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_POST_delete(self):
- d = self.POST(self.public_url + "/foo", t="delete", name="bar.txt")
+ d = self.shouldSucceed("POST_delete", http.OK, self.POST,
+ self.public_url + "/foo", t="delete", name="bar.txt")
d.addCallback(lambda res: self._foo_node.list())
def _check(children):
self.failIf(u"bar.txt" in children)
@@ -2311,40 +2527,43 @@
return d
def test_POST_rename_file(self):
- d = self.POST(self.public_url + "/foo", t="rename",
- from_name="bar.txt", to_name='wibble.txt')
+ d = self.shouldSucceed("POST_rename_file", http.OK, self.POST,
+ self.public_url + "/foo", t="rename",
+ from_name="bar.txt", to_name='wibble.txt')
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"wibble.txt"))
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/wibble.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt?t=json"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/wibble.txt?t=json"))
d.addCallback(self.failUnlessIsBarJSON)
return d
def test_POST_rename_file_redundant(self):
- d = self.POST(self.public_url + "/foo", t="rename",
- from_name="bar.txt", to_name='bar.txt')
+ d = self.shouldSucceed("POST_rename_file_redundant", http.OK, self.POST,
+ self.public_url + "/foo", t="rename",
+ from_name="bar.txt", to_name='bar.txt')
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/bar.txt?t=json"))
d.addCallback(self.failUnlessIsBarJSON)
return d
def test_POST_rename_file_replace(self):
# rename a file and replace a directory with it
- d = self.POST(self.public_url + "/foo", t="rename",
- from_name="bar.txt", to_name='empty')
+ d = self.shouldSucceed("POST_rename_file_replace", http.OK, self.POST,
+ self.public_url + "/foo", t="rename",
+ from_name="bar.txt", to_name='empty')
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"empty"))
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/empty"))
d.addCallback(self.failUnlessIsBarDotTxt)
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/empty?t=json"))
d.addCallback(self.failUnlessIsBarJSON)
return d
@@ -2357,7 +2576,7 @@
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/empty?t=json"))
d.addCallback(self.failUnlessIsEmptyJSON)
return d
@@ -2370,7 +2589,7 @@
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
- d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/foo/empty?t=json"))
d.addCallback(self.failUnlessIsEmptyJSON)
return d
@@ -2383,7 +2602,7 @@
d = self.POST(self.public_url + "/foo", t="rename",
from_name="bar.txt", to_name='kirk/spock.txt')
d.addBoth(self.shouldFail, error.Error,
- "test_POST_rename_file_slash_fail",
+ "POST_rename_file_slash_fail",
"400 Bad Request",
"to_name= may not contain a slash",
)
@@ -2392,13 +2611,14 @@
return d
def test_POST_rename_dir(self):
- d = self.POST(self.public_url, t="rename",
- from_name="foo", to_name='plunk')
+ d = self.shouldSucceed("POST_rename_dir", http.OK, self.POST,
+ self.public_url, t="rename",
+ from_name="foo", to_name='plunk')
d.addCallback(lambda res:
self.failIfNodeHasChild(self.public_root, u"foo"))
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self.public_root, u"plunk"))
- d.addCallback(lambda res: self.GET(self.public_url + "/plunk?t=json"))
+ d.addCallback(lambda res: self.shouldSucceedGET(self.public_url + "/plunk?t=json"))
d.addCallback(self.failUnlessIsFooJSON)
return d
@@ -2433,24 +2653,24 @@
d.addCallback(lambda res: self.GET(base+"&t=json"))
d.addBoth(self.shouldRedirect, targetbase+"?t=json")
d.addCallback(self.log, "about to get file by uri")
- d.addCallback(lambda res: self.GET(base, followRedirect=True))
+ d.addCallback(lambda res: self.shouldSucceedGET(base, followRedirect=True))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(self.log, "got file by uri, about to get dir by uri")
- d.addCallback(lambda res: self.GET("/uri?uri=%s&t=json" % self._foo_uri,
- followRedirect=True))
+ d.addCallback(lambda res: self.shouldSucceedGET("/uri?uri=%s&t=json" % self._foo_uri,
+ followRedirect=True))
d.addCallback(self.failUnlessIsFooJSON)
d.addCallback(self.log, "got dir by uri")
return d
def test_GET_URI_form_bad(self):
- d = self.shouldFail2(error.Error, "test_GET_URI_form_bad",
+ d = self.shouldFail2(error.Error, "GET_URI_form_bad",
"400 Bad Request", "GET /uri requires uri=",
self.GET, "/uri")
return d
def test_GET_rename_form(self):
- d = self.GET(self.public_url + "/foo?t=rename-form&name=bar.txt",
+ d = self.shouldSucceedGET(self.public_url + "/foo?t=rename-form&name=bar.txt",
followRedirect=True)
def _check(res):
self.failUnless('name="when_done" value="."' in res, res)
@@ -2465,23 +2685,23 @@
def test_GET_URI_URL(self):
base = "/uri/%s" % self._bar_txt_uri
- d = self.GET(base)
+ d = self.shouldSucceedGET(base)
d.addCallback(self.failUnlessIsBarDotTxt)
- d.addCallback(lambda res: self.GET(base+"?filename=bar.txt"))
+ d.addCallback(lambda res: self.shouldSucceedGET(base+"?filename=bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
- d.addCallback(lambda res: self.GET(base+"?filename=bar.txt&save=true"))
+ d.addCallback(lambda res: self.shouldSucceedGET(base+"?filename=bar.txt&save=true"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_GET_URI_URL_dir(self):
base = "/uri/%s?t=json" % self._foo_uri
- d = self.GET(base)
+ d = self.shouldSucceedGET(base)
d.addCallback(self.failUnlessIsFooJSON)
return d
def test_GET_URI_URL_missing(self):
base = "/uri/%s" % self._bad_file_uri
- d = self.shouldHTTPError("test_GET_URI_URL_missing",
+ d = self.shouldHTTPError("GET_URI_URL_missing",
http.GONE, None, "NotEnoughSharesError",
self.GET, base)
# TODO: how can we exercise both sides of WebDownloadTarget.fail
@@ -2499,9 +2719,9 @@
d.addCallback(lambda res:
self.failUnlessEqual(res.strip(), new_uri))
d.addCallback(lambda res:
- self.failUnlessChildURIIs(self.public_root,
- u"foo",
- new_uri))
+ self.failUnlessRWChildURIIs(self.public_root,
+ u"foo",
+ new_uri))
return d
d.addCallback(_made_dir)
return d
@@ -2512,32 +2732,33 @@
new_uri = dn.get_uri()
# replace /foo with a new (empty) directory, but ask that
# replace=false, so it should fail
- d = self.shouldFail2(error.Error, "test_PUT_DIRURL_uri_noreplace",
+ d = self.shouldFail2(error.Error, "PUT_DIRURL_uri_noreplace",
"409 Conflict", "There was already a child by that name, and you asked me to not replace it",
self.PUT,
self.public_url + "/foo?t=uri&replace=false",
new_uri)
d.addCallback(lambda res:
- self.failUnlessChildURIIs(self.public_root,
- u"foo",
- self._foo_uri))
+ self.failUnlessRWChildURIIs(self.public_root,
+ u"foo",
+ self._foo_uri))
return d
d.addCallback(_made_dir)
return d
def test_PUT_DIRURL_bad_t(self):
- d = self.shouldFail2(error.Error, "test_PUT_DIRURL_bad_t",
+ d = self.shouldFail2(error.Error, "PUT_DIRURL_bad_t",
"400 Bad Request", "PUT to a directory",
self.PUT, self.public_url + "/foo?t=BOGUS", "")
d.addCallback(lambda res:
- self.failUnlessChildURIIs(self.public_root,
- u"foo",
- self._foo_uri))
+ self.failUnlessRWChildURIIs(self.public_root,
+ u"foo",
+ self._foo_uri))
return d
def test_PUT_NEWFILEURL_uri(self):
contents, n, new_uri = self.makefile(8)
- d = self.PUT(self.public_url + "/foo/new.txt?t=uri", new_uri)
+ d = self.shouldSucceed("PUT_NEWFILEURL_uri", http.OK, self.PUT,
+ self.public_url + "/foo/new.txt?t=uri", new_uri)
d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
d.addCallback(lambda res:
self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
@@ -2564,13 +2785,14 @@
def test_PUT_NEWFILE_URI(self):
file_contents = "New file contents here\n"
- d = self.PUT("/uri", file_contents)
+ d = self.shouldSucceed("PUT_NEWFILE_URI", http.OK, self.PUT,
+ "/uri", file_contents)
def _check(uri):
assert isinstance(uri, str), uri
self.failUnless(uri in FakeCHKFileNode.all_contents)
self.failUnlessEqual(FakeCHKFileNode.all_contents[uri],
file_contents)
- return self.GET("/uri/%s" % uri)
+ return self.shouldSucceedGET("/uri/%s" % uri)
d.addCallback(_check)
def _check2(res):
self.failUnlessEqual(res, file_contents)
@@ -2579,13 +2801,14 @@
def test_PUT_NEWFILE_URI_not_mutable(self):
file_contents = "New file contents here\n"
- d = self.PUT("/uri?mutable=false", file_contents)
+ d = self.shouldSucceed("PUT_NEWFILE_URI_not_mutable", http.OK, self.PUT,
+ "/uri?mutable=false", file_contents)
def _check(uri):
assert isinstance(uri, str), uri
self.failUnless(uri in FakeCHKFileNode.all_contents)
self.failUnlessEqual(FakeCHKFileNode.all_contents[uri],
file_contents)
- return self.GET("/uri/%s" % uri)
+ return self.shouldSucceedGET("/uri/%s" % uri)
d.addCallback(_check)
def _check2(res):
self.failUnlessEqual(res, file_contents)
@@ -2602,7 +2825,8 @@
def test_PUT_NEWFILE_URI_mutable(self):
file_contents = "New file contents here\n"
- d = self.PUT("/uri?mutable=true", file_contents)
+ d = self.shouldSucceed("PUT_NEWFILE_URI_mutable", http.OK, self.PUT,
+ "/uri?mutable=true", file_contents)
def _check1(filecap):
filecap = filecap.strip()
self.failUnless(filecap.startswith("URI:SSK:"), filecap)
@@ -2614,7 +2838,7 @@
d.addCallback(_check1)
def _check2(data):
self.failUnlessEqual(data, file_contents)
- return self.GET("/uri/%s" % urllib.quote(self.filecap))
+ return self.shouldSucceedGET("/uri/%s" % urllib.quote(self.filecap))
d.addCallback(_check2)
def _check3(res):
self.failUnlessEqual(res, file_contents)
@@ -2622,19 +2846,21 @@
return d
def test_PUT_mkdir(self):
- d = self.PUT("/uri?t=mkdir", "")
+ d = self.shouldSucceed("PUT_mkdir", http.OK, self.PUT,
+ "/uri?t=mkdir", "")
def _check(uri):
n = self.s.create_node_from_uri(uri.strip())
d2 = self.failUnlessNodeKeysAre(n, [])
d2.addCallback(lambda res:
- self.GET("/uri/%s?t=json" % uri))
+ self.shouldSucceedGET("/uri/%s?t=json" % uri))
return d2
d.addCallback(_check)
d.addCallback(self.failUnlessIsEmptyJSON)
return d
def test_POST_check(self):
- d = self.POST(self.public_url + "/foo", t="check", name="bar.txt")
+ d = self.shouldSucceed("POST_check", http.OK, self.POST,
+ self.public_url + "/foo", t="check", name="bar.txt")
def _done(res):
# this returns a string form of the results, which are probably
# None since we're using fake filenodes.
@@ -2647,7 +2873,7 @@
def test_bad_method(self):
url = self.webish_url + self.public_url + "/foo/bar.txt"
- d = self.shouldHTTPError("test_bad_method",
+ d = self.shouldHTTPError("bad_method",
501, "Not Implemented",
"I don't know how to treat a BOGUS request.",
client.getPage, url, method="BOGUS")
@@ -2655,28 +2881,30 @@
def test_short_url(self):
url = self.webish_url + "/uri"
- d = self.shouldHTTPError("test_short_url", 501, "Not Implemented",
+ d = self.shouldHTTPError("short_url", 501, "Not Implemented",
"I don't know how to treat a DELETE request.",
client.getPage, url, method="DELETE")
return d
def test_ophandle_bad(self):
url = self.webish_url + "/operations/bogus?t=status"
- d = self.shouldHTTPError("test_ophandle_bad", 404, "404 Not Found",
+ d = self.shouldHTTPError("ophandle_bad", 404, "404 Not Found",
"unknown/expired handle 'bogus'",
client.getPage, url)
return d
def test_ophandle_cancel(self):
- d = self.POST(self.public_url + "/foo/?t=start-manifest&ophandle=128",
- followRedirect=True)
+ d = self.shouldSucceed("ophandle_cancel-1", http.OK, self.POST,
+ self.public_url + "/foo/?t=start-manifest&ophandle=128",
+ followRedirect=True)
d.addCallback(lambda ignored:
- self.GET("/operations/128?t=status&output=JSON"))
+ self.shouldSucceedGET("/operations/128?t=status&output=JSON"))
def _check1(res):
data = simplejson.loads(res)
self.failUnless("finished" in data, res)
monitor = self.ws.root.child_operations.handles["128"][0]
- d = self.POST("/operations/128?t=cancel&output=JSON")
+ d = self.shouldSucceed("ophandle_cancel-2", http.OK, self.POST,
+ "/operations/128?t=cancel&output=JSON")
def _check2(res):
data = simplejson.loads(res)
self.failUnless("finished" in data, res)
@@ -2686,7 +2914,7 @@
return d
d.addCallback(_check1)
d.addCallback(lambda ignored:
- self.shouldHTTPError("test_ophandle_cancel",
+ self.shouldHTTPError("ophandle_cancel",
404, "404 Not Found",
"unknown/expired handle '128'",
self.GET,
@@ -2697,7 +2925,7 @@
d = self.POST(self.public_url + "/foo/?t=start-manifest&ophandle=129&retain-for=60",
followRedirect=True)
d.addCallback(lambda ignored:
- self.GET("/operations/129?t=status&output=JSON&retain-for=0"))
+ self.shouldSucceedGET("/operations/129?t=status&output=JSON&retain-for=0"))
def _check1(res):
data = simplejson.loads(res)
self.failUnless("finished" in data, res)
@@ -2705,7 +2933,7 @@
# the retain-for=0 will cause the handle to be expired very soon
d.addCallback(self.stall, 2.0)
d.addCallback(lambda ignored:
- self.shouldHTTPError("test_ophandle_retainfor",
+ self.shouldHTTPError("ophandle_retainfor",
404, "404 Not Found",
"unknown/expired handle '129'",
self.GET,
@@ -2713,14 +2941,15 @@
return d
def test_ophandle_release_after_complete(self):
- d = self.POST(self.public_url + "/foo/?t=start-manifest&ophandle=130",
- followRedirect=True)
+ d = self.shouldSucceed("ophandle_release_after_complete", http.OK, self.POST,
+ self.public_url + "/foo/?t=start-manifest&ophandle=130",
+ followRedirect=True)
d.addCallback(self.wait_for_operation, "130")
d.addCallback(lambda ignored:
- self.GET("/operations/130?t=status&output=JSON&release-after-complete=true"))
+ self.shouldSucceedGET("/operations/130?t=status&output=JSON&release-after-complete=true"))
# the release-after-complete=true will cause the handle to be expired
d.addCallback(lambda ignored:
- self.shouldHTTPError("test_ophandle_release_after_complete",
+ self.shouldHTTPError("ophandle_release_after_complete",
404, "404 Not Found",
"unknown/expired handle '130'",
self.GET,
@@ -2728,7 +2957,8 @@
return d
def test_incident(self):
- d = self.POST("/report_incident", details="eek")
+ d = self.shouldSucceed("incident", http.OK, self.POST,
+ "/report_incident", details="eek")
def _done(res):
self.failUnless("Thank you for your report!" in res, res)
d.addCallback(_done)
@@ -2741,7 +2971,7 @@
f.write("hello")
f.close()
- d = self.GET("/static/subdir/hello.txt")
+ d = self.shouldSucceedGET("/static/subdir/hello.txt")
def _check(res):
self.failUnlessEqual(res, "hello")
d.addCallback(_check)
@@ -2754,7 +2984,7 @@
self.failUnlessEqual(common.parse_replace_arg("false"), False)
self.failUnlessEqual(common.parse_replace_arg("only-files"),
"only-files")
- self.shouldFail(AssertionError, "test_parse_replace_arg", "",
+ self.shouldFail(AssertionError, "parse_replace_arg", "",
common.parse_replace_arg, "only_fles")
def test_abbreviate_time(self):
@@ -3059,71 +3289,225 @@
d.addErrback(self.explain_web_error)
return d
- def test_unknown(self):
+ def test_unknown(self, immutable=False):
self.basedir = "web/Grid/unknown"
+ if immutable:
+ self.basedir = "web/Grid/unknown-immutable"
+
self.set_up_grid()
c0 = self.g.clients[0]
self.uris = {}
self.fileurls = {}
- future_writecap = "x-tahoe-crazy://I_am_from_the_future."
- future_readcap = "x-tahoe-crazy-readonly://I_am_from_the_future."
+ future_write_uri = "x-tahoe-crazy://I_am_from_the_future."
+ future_read_uri = "x-tahoe-crazy-readonly://I_am_from_the_future."
# the future cap format may contain slashes, which must be tolerated
- expected_info_url = "uri/%s?t=info" % urllib.quote(future_writecap,
+ expected_info_url = "uri/%s?t=info" % urllib.quote(future_write_uri,
safe="")
- future_node = UnknownNode(future_writecap, future_readcap)
- d = c0.create_dirnode()
+ if immutable:
+ name = u"future-imm"
+ future_node = UnknownNode(None, future_read_uri, deep_immutable=True)
+ d = c0.create_immutable_dirnode({name: (future_node, {})})
+ else:
+ name = u"future"
+ future_node = UnknownNode(future_write_uri, future_read_uri)
+ d = c0.create_dirnode()
+
def _stash_root_and_create_file(n):
self.rootnode = n
self.rooturl = "uri/" + urllib.quote(n.get_uri()) + "/"
self.rourl = "uri/" + urllib.quote(n.get_readonly_uri()) + "/"
- return self.rootnode.set_node(u"future", future_node)
+ if not immutable:
+ return self.rootnode.set_node(name, future_node)
d.addCallback(_stash_root_and_create_file)
+
# make sure directory listing tolerates unknown nodes
d.addCallback(lambda ign: self.GET(self.rooturl))
def _check_html(res):
- self.failUnlessIn("
future
", res)
- # find the More Info link for "future", should be relative
+ self.failUnlessIn("
%s
" % (str(name),), res)
+ # find the More Info link for name, should be relative
mo = re.search(r'More Info', res)
info_url = mo.group(1)
- self.failUnlessEqual(info_url, "future?t=info")
+ self.failUnlessEqual(info_url, "%s?t=info" % (str(name),))
d.addCallback(_check_html)
d.addCallback(lambda ign: self.GET(self.rooturl+"?t=json"))
- def _check_json(res, expect_writecap):
+ def _check_json(res, expect_rw_uri):
data = simplejson.loads(res)
self.failUnlessEqual(data[0], "dirnode")
- f = data[1]["children"]["future"]
+ f = data[1]["children"][name]
self.failUnlessEqual(f[0], "unknown")
- if expect_writecap:
- self.failUnlessEqual(f[1]["rw_uri"], future_writecap)
+ if expect_rw_uri:
+ self.failUnlessEqual(f[1]["rw_uri"], future_write_uri)
else:
self.failIfIn("rw_uri", f[1])
- self.failUnlessEqual(f[1]["ro_uri"], future_readcap)
+ self.failUnlessEqual(f[1]["ro_uri"],
+ ("imm." if immutable else "ro.") + future_read_uri)
self.failUnless("metadata" in f[1])
- d.addCallback(_check_json, expect_writecap=True)
- d.addCallback(lambda ign: self.GET(expected_info_url))
- def _check_info(res, expect_readcap):
+ d.addCallback(_check_json, expect_rw_uri=not immutable)
+
+ def _check_info(res, expect_rw_uri, expect_ro_uri):
self.failUnlessIn("Object Type: unknown", res)
- self.failUnlessIn(future_writecap, res)
- if expect_readcap:
- self.failUnlessIn(future_readcap, res)
+ if expect_rw_uri:
+ self.failUnlessIn(future_write_uri, res)
+ if expect_ro_uri:
+ self.failUnlessIn(future_read_uri, res)
+ else:
+ self.failIfIn(future_read_uri, res)
self.failIfIn("Raw data as", res)
self.failIfIn("Directory writecap", res)
self.failIfIn("Checker Operations", res)
self.failIfIn("Mutable File Operations", res)
self.failIfIn("Directory Operations", res)
- d.addCallback(_check_info, expect_readcap=False)
- d.addCallback(lambda ign: self.GET(self.rooturl+"future?t=info"))
- d.addCallback(_check_info, expect_readcap=True)
+
+ # Known bug: these should have expect_rw_uri=not immutable, but the
+ # info pages are currently broken. Related to ticket #922.
+
+ d.addCallback(lambda ign: self.GET(expected_info_url))
+ d.addCallback(_check_info, expect_rw_uri=False, expect_ro_uri=False)
+ d.addCallback(lambda ign: self.GET("%s%s?t=info" % (self.rooturl, str(name))))
+ d.addCallback(_check_info, expect_rw_uri=False, expect_ro_uri=True)
# and make sure that a read-only version of the directory can be
- # rendered too. This version will not have future_writecap
+ # rendered too. This version will not have future_write_uri, whether
+ # or not future_node was immutable.
d.addCallback(lambda ign: self.GET(self.rourl))
d.addCallback(_check_html)
d.addCallback(lambda ign: self.GET(self.rourl+"?t=json"))
- d.addCallback(_check_json, expect_writecap=False)
+ d.addCallback(_check_json, expect_rw_uri=False)
+ return d
+
+ def test_immutable_unknown(self):
+ return self.test_unknown(immutable=True)
+
+ def test_mutant_dirnodes_are_omitted(self):
+ self.basedir = "web/Grid/mutant_dirnodes_are_omitted"
+
+ self.set_up_grid()
+ c = self.g.clients[0]
+ nm = c.nodemaker
+ self.uris = {}
+ self.fileurls = {}
+
+ lonely_uri = "URI:LIT:n5xgk" # LIT for "one"
+ mut_write_uri = "URI:SSK:vfvcbdfbszyrsaxchgevhmmlii:euw4iw7bbnkrrwpzuburbhppuxhc3gwxv26f6imekhz7zyw2ojnq"
+ mut_read_uri = "URI:SSK-RO:e3mdrzfwhoq42hy5ubcz6rp3o4:ybyibhnp3vvwuq2vaw2ckjmesgkklfs6ghxleztqidihjyofgw7q"
+
+ # This method tests mainly dirnode, but we'd have to duplicate code in order to
+ # test the dirnode and web layers separately.
+
+ # 'lonely' is a valid LIT child, 'ro' is a mutant child with an SSK-RO readcap,
+ # and 'write-in-ro' is a mutant child with an SSK writecap in the ro_uri field.
+ # When the directory is read, the mutants should be silently disposed of, leaving
+ # their lonely sibling.
+ # We don't test the case of a retrieving a cap from the encrypted rw_uri field,
+ # because immutable directories don't have a writecap and therefore that field
+ # isn't (and can't be) decrypted.
+ # TODO: The field still exists in the netstring. Technically we should check what
+ # happens if something is put there (it should be ignored), but that can wait.
+
+ lonely_child = nm.create_from_cap(lonely_uri)
+ mutant_ro_child = nm.create_from_cap(mut_read_uri)
+ mutant_write_in_ro_child = nm.create_from_cap(mut_write_uri)
+
+ def _by_hook_or_by_crook():
+ return True
+ for n in [mutant_ro_child, mutant_write_in_ro_child]:
+ n.is_allowed_in_immutable_directory = _by_hook_or_by_crook
+
+ mutant_write_in_ro_child.get_write_uri = lambda: None
+ mutant_write_in_ro_child.get_readonly_uri = lambda: mut_write_uri
+
+ kids = {u"lonely": (lonely_child, {}),
+ u"ro": (mutant_ro_child, {}),
+ u"write-in-ro": (mutant_write_in_ro_child, {}),
+ }
+ d = c.create_immutable_dirnode(kids)
+
+ def _created(dn):
+ self.failUnless(isinstance(dn, dirnode.DirectoryNode))
+ self.failIf(dn.is_mutable())
+ self.failUnless(dn.is_readonly())
+ # This checks that if we somehow ended up calling dn._decrypt_rwcapdata, it would fail.
+ self.failIf(hasattr(dn._node, 'get_writekey'))
+ rep = str(dn)
+ self.failUnless("RO-IMM" in rep)
+ cap = dn.get_cap()
+ self.failUnlessIn("CHK", cap.to_string())
+ self.cap = cap
+ self.rootnode = dn
+ self.rooturl = "uri/" + urllib.quote(dn.get_uri()) + "/"
+ return download_to_data(dn._node)
+ d.addCallback(_created)
+
+ def _check_data(data):
+ # Decode the netstring representation of the directory to check that all children
+ # are present. This is a bit of an abstraction violation, but there's not really
+ # any other way to do it given that the real DirectoryNode._unpack_contents would
+ # strip the mutant children out (which is what we're trying to test, later).
+ position = 0
+ numkids = 0
+ while position < len(data):
+ entries, position = split_netstring(data, 1, position)
+ entry = entries[0]
+ (name, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
+ name = name.decode("utf-8")
+ self.failUnless(rwcapdata == "")
+ ro_uri = ro_uri.strip()
+ if name in kids:
+ self.failIfEqual(ro_uri, "")
+ (expected_child, ign) = kids[name]
+ self.failUnlessEqual(ro_uri, expected_child.get_readonly_uri())
+ numkids += 1
+
+ self.failUnlessEqual(numkids, 3)
+ return self.rootnode.list()
+ d.addCallback(_check_data)
+
+ # Now when we use the real directory listing code, the mutants should be absent.
+ def _check_kids(children):
+ self.failUnlessEqual(sorted(children.keys()), [u"lonely"])
+ lonely_node, lonely_metadata = children[u"lonely"]
+
+ self.failUnlessEqual(lonely_node.get_write_uri(), None)
+ self.failUnlessEqual(lonely_node.get_readonly_uri(), lonely_uri)
+ d.addCallback(_check_kids)
+
+ d.addCallback(lambda ign: nm.create_from_cap(self.cap.to_string()))
+ d.addCallback(lambda n: n.list())
+ d.addCallback(_check_kids) # again with dirnode recreated from cap
+
+ # Make sure the lonely child can be listed in HTML...
+ d.addCallback(lambda ign: self.GET(self.rooturl))
+ def _check_html(res):
+ self.failIfIn("URI:SSK", res)
+ get_lonely = "".join([r'