source: trunk/src/allmydata/scripts/backupdb.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 12.6 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os.path, sys, time, random, stat
6
7from allmydata.util.netstring import netstring
8from allmydata.util.hashutil import backupdb_dirhash
9from allmydata.util import base32
10from allmydata.util.fileutil import abspath_expanduser_unicode
11from allmydata.util.encodingutil import to_bytes
12from allmydata.util.dbutil import get_db, DBError
13
14
15DAY = 24*60*60
16MONTH = 30*DAY
17
18SCHEMA_v1 = """
19CREATE TABLE version -- added in v1
20(
21 version INTEGER  -- contains one row, set to 2
22);
23
24CREATE TABLE local_files -- added in v1
25(
26 path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
27 size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
28 mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
29 ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
30 fileid INTEGER
31);
32
33CREATE TABLE caps -- added in v1
34(
35 fileid INTEGER PRIMARY KEY AUTOINCREMENT,
36 filecap VARCHAR(256) UNIQUE       -- URI:CHK:...
37);
38
39CREATE TABLE last_upload -- added in v1
40(
41 fileid INTEGER PRIMARY KEY,
42 last_uploaded TIMESTAMP,
43 last_checked TIMESTAMP
44);
45
46"""
47
48TABLE_DIRECTORY = """
49
50CREATE TABLE directories -- added in v2
51(
52 dirhash varchar(256) PRIMARY KEY,  -- base32(dirhash)
53 dircap varchar(256),               -- URI:DIR2-CHK:...
54 last_uploaded TIMESTAMP,
55 last_checked TIMESTAMP
56);
57
58"""
59
60SCHEMA_v2 = SCHEMA_v1 + TABLE_DIRECTORY
61
62UPDATE_v1_to_v2 = TABLE_DIRECTORY + """
63UPDATE version SET version=2;
64"""
65
66UPDATERS = {
67    2: UPDATE_v1_to_v2,
68}
69
70def get_backupdb(dbfile, stderr=sys.stderr,
71                 create_version=(SCHEMA_v2, 2), just_create=False):
72    # Open or create the given backupdb file. The parent directory must
73    # exist.
74    try:
75        (sqlite3, db) = get_db(dbfile, stderr, create_version, updaters=UPDATERS,
76                               just_create=just_create, dbname="backupdb")
77        return BackupDB_v2(sqlite3, db)
78    except DBError as e:
79        print(e, file=stderr)
80        return None
81
82
83class FileResult:
84    def __init__(self, bdb, filecap, should_check,
85                 path, mtime, ctime, size):
86        self.bdb = bdb
87        self.filecap = filecap
88        self.should_check_p = should_check
89
90        self.path = path
91        self.mtime = mtime
92        self.ctime = ctime
93        self.size = size
94
95    def was_uploaded(self):
96        if self.filecap:
97            return self.filecap
98        return False
99
100    def did_upload(self, filecap):
101        self.bdb.did_upload_file(filecap, self.path,
102                                 self.mtime, self.ctime, self.size)
103
104    def should_check(self):
105        return self.should_check_p
106
107    def did_check_healthy(self, results):
108        self.bdb.did_check_file_healthy(self.filecap, results)
109
110
111class DirectoryResult:
112    def __init__(self, bdb, dirhash, dircap, should_check):
113        self.bdb = bdb
114        self.dircap = dircap
115        self.should_check_p = should_check
116        self.dirhash = dirhash
117
118    def was_created(self):
119        if self.dircap:
120            return self.dircap
121        return False
122
123    def did_create(self, dircap):
124        self.bdb.did_create_directory(dircap, self.dirhash)
125
126    def should_check(self):
127        return self.should_check_p
128
129    def did_check_healthy(self, results):
130        self.bdb.did_check_directory_healthy(self.dircap, results)
131
132
133class BackupDB_v2:
134    VERSION = 2
135    NO_CHECK_BEFORE = 1*MONTH
136    ALWAYS_CHECK_AFTER = 2*MONTH
137
138    def __init__(self, sqlite_module, connection):
139        self.sqlite_module = sqlite_module
140        self.connection = connection
141        self.cursor = connection.cursor()
142
143    def check_file(self, path, use_timestamps=True):
144        """I will tell you if a given local file needs to be uploaded or not,
145        by looking in a database and seeing if I have a record of this file
146        having been uploaded earlier.
147
148        I return a FileResults object, synchronously. If r.was_uploaded()
149        returns False, you should upload the file. When you are finished
150        uploading it, call r.did_upload(filecap), so I can update my
151        database.
152
153        If was_uploaded() returns a filecap, you might be able to avoid an
154        upload. Call r.should_check(), and if it says False, you can skip the
155        upload and use the filecap returned by was_uploaded().
156
157        If should_check() returns True, you should perform a filecheck on the
158        filecap returned by was_uploaded(). If the check indicates the file
159        is healthy, please call r.did_check_healthy(checker_results) so I can
160        update the database, using the de-JSONized response from the webapi
161        t=check call for 'checker_results'. If the check indicates the file
162        is not healthy, please upload the file and call r.did_upload(filecap)
163        when you're done.
164
165        If use_timestamps=True (the default), I will compare ctime and mtime
166        of the local file against an entry in my database, and consider the
167        file to be unchanged if ctime, mtime, and filesize are all the same
168        as the earlier version. If use_timestamps=False, I will not trust the
169        timestamps, so more files (perhaps all) will be marked as needing
170        upload. A future version of this database may hash the file to make
171        equality decisions, in which case use_timestamps=False will not
172        always imply r.must_upload()==True.
173
174        'path' points to a local file on disk, possibly relative to the
175        current working directory. The database stores absolute pathnames.
176        """
177
178        path = abspath_expanduser_unicode(path)
179
180        # TODO: consider using get_pathinfo.
181        s = os.stat(path)
182        size = s[stat.ST_SIZE]
183        ctime = s[stat.ST_CTIME]
184        mtime = s[stat.ST_MTIME]
185
186        now = time.time()
187        c = self.cursor
188
189        c.execute("SELECT size,mtime,ctime,fileid"
190                  " FROM local_files"
191                  " WHERE path=?",
192                  (path,))
193        row = self.cursor.fetchone()
194        if not row:
195            return FileResult(self, None, False, path, mtime, ctime, size)
196        (last_size,last_mtime,last_ctime,last_fileid) = row
197
198        c.execute("SELECT caps.filecap, last_upload.last_checked"
199                  " FROM caps,last_upload"
200                  " WHERE caps.fileid=? AND last_upload.fileid=?",
201                  (last_fileid, last_fileid))
202        row2 = c.fetchone()
203
204        if ((last_size != size
205             or not use_timestamps
206             or last_mtime != mtime
207             or last_ctime != ctime) # the file has been changed
208            or (not row2) # we somehow forgot where we put the file last time
209            ):
210            c.execute("DELETE FROM local_files WHERE path=?", (path,))
211            self.connection.commit()
212            return FileResult(self, None, False, path, mtime, ctime, size)
213
214        # at this point, we're allowed to assume the file hasn't been changed
215        (filecap, last_checked) = row2
216        age = now - last_checked
217
218        probability = ((age - self.NO_CHECK_BEFORE) /
219                       (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
220        probability = min(max(probability, 0.0), 1.0)
221        should_check = bool(random.random() < probability)
222
223        return FileResult(self, to_bytes(filecap), should_check,
224                          path, mtime, ctime, size)
225
226    def get_or_allocate_fileid_for_cap(self, filecap):
227        # find an existing fileid for this filecap, or insert a new one. The
228        # caller is required to commit() afterwards.
229
230        # mysql has "INSERT ... ON DUPLICATE KEY UPDATE", but not sqlite
231        # sqlite has "INSERT ON CONFLICT REPLACE", but not mysql
232        # So we use INSERT, ignore any error, then a SELECT
233        c = self.cursor
234        try:
235            c.execute("INSERT INTO caps (filecap) VALUES (?)", (filecap,))
236        except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
237            # sqlite3 on sid gives IntegrityError
238            # pysqlite2 (which we don't use, so maybe no longer relevant) on dapper gives OperationalError
239            pass
240        c.execute("SELECT fileid FROM caps WHERE filecap=?", (filecap,))
241        foundrow = c.fetchone()
242        assert foundrow
243        fileid = foundrow[0]
244        return fileid
245
246    def did_upload_file(self, filecap, path, mtime, ctime, size):
247        now = time.time()
248        fileid = self.get_or_allocate_fileid_for_cap(filecap)
249        try:
250            self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
251                                (fileid, now, now))
252        except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
253            self.cursor.execute("UPDATE last_upload"
254                                " SET last_uploaded=?, last_checked=?"
255                                " WHERE fileid=?",
256                                (now, now, fileid))
257        try:
258            self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?)",
259                                (path, size, mtime, ctime, fileid))
260        except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
261            self.cursor.execute("UPDATE local_files"
262                                " SET size=?, mtime=?, ctime=?, fileid=?"
263                                " WHERE path=?",
264                                (size, mtime, ctime, fileid, path))
265        self.connection.commit()
266
267    def did_check_file_healthy(self, filecap, results):
268        now = time.time()
269        fileid = self.get_or_allocate_fileid_for_cap(filecap)
270        self.cursor.execute("UPDATE last_upload"
271                            " SET last_checked=?"
272                            " WHERE fileid=?",
273                            (now, fileid))
274        self.connection.commit()
275
276    def check_directory(self, contents):
277        """I will tell you if a new directory needs to be created for a given
278        set of directory contents, or if I know of an existing (immutable)
279        directory that can be used instead.
280
281        'contents' should be a dictionary that maps from child name (a single
282        unicode string) to immutable childcap (filecap or dircap).
283
284        I return a DirectoryResult object, synchronously. If r.was_created()
285        returns False, you should create the directory (with
286        t=mkdir-immutable). When you are finished, call r.did_create(dircap)
287        so I can update my database.
288
289        If was_created() returns a dircap, you might be able to avoid the
290        mkdir. Call r.should_check(), and if it says False, you can skip the
291        mkdir and use the dircap returned by was_created().
292
293        If should_check() returns True, you should perform a check operation
294        on the dircap returned by was_created(). If the check indicates the
295        directory is healthy, please call
296        r.did_check_healthy(checker_results) so I can update the database,
297        using the de-JSONized response from the webapi t=check call for
298        'checker_results'. If the check indicates the directory is not
299        healthy, please repair or re-create the directory and call
300        r.did_create(dircap) when you're done.
301        """
302
303        now = time.time()
304        entries = []
305        for name in contents:
306            entries.append( [name.encode("utf-8"), contents[name]] )
307        entries.sort()
308        data = b"".join([netstring(name_utf8)+netstring(cap)
309                         for (name_utf8,cap) in entries])
310        dirhash = backupdb_dirhash(data)
311        dirhash_s = base32.b2a(dirhash)
312        c = self.cursor
313        c.execute("SELECT dircap, last_checked"
314                  " FROM directories WHERE dirhash=?", (dirhash_s,))
315        row = c.fetchone()
316        if not row:
317            return DirectoryResult(self, dirhash_s, None, False)
318        (dircap, last_checked) = row
319        age = now - last_checked
320
321        probability = ((age - self.NO_CHECK_BEFORE) /
322                       (self.ALWAYS_CHECK_AFTER - self.NO_CHECK_BEFORE))
323        probability = min(max(probability, 0.0), 1.0)
324        should_check = bool(random.random() < probability)
325
326        return DirectoryResult(self, dirhash_s, to_bytes(dircap), should_check)
327
328    def did_create_directory(self, dircap, dirhash):
329        now = time.time()
330        # if the dirhash is already present (i.e. we've re-uploaded an
331        # existing directory, possibly replacing the dircap with a new one),
332        # update the record in place. Otherwise create a new record.)
333        self.cursor.execute("REPLACE INTO directories VALUES (?,?,?,?)",
334                            (dirhash, dircap, now, now))
335        self.connection.commit()
336
337    def did_check_directory_healthy(self, dircap, results):
338        now = time.time()
339        self.cursor.execute("UPDATE directories"
340                            " SET last_checked=?"
341                            " WHERE dircap=?",
342                            (now, dircap))
343        self.connection.commit()
Note: See TracBrowser for help on using the repository browser.