source: trunk/misc/simulators/simulator.py

Last change on this file was b856238, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-15T15:53:34Z

remove old Python2 future statements

  • Property mode set to 100644
File size: 8.9 KB
Line 
1#! /usr/bin/env python
2
3
4import hashlib
5import os, random
6
7from pkg_resources import require
8require('PyRRD')
9from pyrrd import graph
10from pyrrd.rrd import DataSource, RRD, RRA
11
12
13def sha(s):
14    return hashlib.sha1(s).digest()
15
16def randomid():
17    return os.urandom(20)
18
19class Node(object):
20    def __init__(self, nid, introducer, simulator):
21        self.nid = nid
22        self.introducer = introducer
23        self.simulator = simulator
24        self.shares = {}
25        self.capacity = random.randrange(1000)
26        self.utilization = 0
27        self.files = []
28
29    def permute_peers(self, fileid):
30        permuted = [(sha(fileid+n.nid),n)
31                    for n in self.introducer.get_all_nodes()]
32        permuted.sort()
33        return permuted
34
35    def publish_file(self, fileid, size, numshares=100):
36        sharesize = 4 * size / numshares
37        permuted = self.permute_peers(fileid)
38        last_givento = None
39        tried = 0
40        givento = []
41        while numshares and permuted:
42            pid,node = permuted.pop(0)
43            tried += 1
44            last_givento = pid
45            if node.accept_share(fileid, sharesize):
46                givento.append((pid,node))
47                numshares -= 1
48        if numshares:
49            # couldn't push, should delete
50            for pid,node in givento:
51                node.delete_share(fileid)
52            return False
53        self.files.append((fileid, numshares))
54        self.introducer.please_preserve(fileid, size, tried, last_givento)
55        return (True, tried)
56
57    def accept_share(self, fileid, sharesize):
58        if self.utilization < self.capacity:
59            # we have room! yay!
60            self.shares[fileid] = sharesize
61            self.utilization += sharesize
62            return True
63        if self.decide(sharesize):
64            # we don't, but we'll make room
65            self.make_space(sharesize)
66            self.shares[fileid] = sharesize
67            self.utilization += sharesize
68            return True
69        else:
70            # we're full, try elsewhere
71            return False
72
73    def decide(self, sharesize):
74        return False
75
76    def make_space(self, sharesize):
77        assert sharesize <= self.capacity
78        while self.capacity - self.utilization < sharesize:
79            victim = random.choice(self.shares.keys())
80            self.simulator.lost_data(self.shares[victim])
81            self.delete_share(victim)
82
83    def delete_share(self, fileid):
84        if fileid in self.shares:
85            self.utilization -= self.shares[fileid]
86            del self.shares[fileid]
87            return True
88        return False
89
90    def retrieve_file(self):
91        if not self.files:
92            return
93        fileid,numshares = random.choice(self.files)
94        needed = numshares / 4
95        peers = []
96        for pid,node in self.permute_peers(fileid):
97            if random.random() > self.simulator.P_NODEAVAIL:
98                continue # node isn't available right now
99            if node.has_share(fileid):
100                peers.append(node)
101            if len(peers) >= needed:
102                return True
103        return False
104
105    def delete_file(self):
106        if not self.files:
107            return False
108        which = random.choice(self.files)
109        self.files.remove(which)
110        fileid,numshares = which
111        self.introducer.delete(fileid)
112        return True
113
114class Introducer(object):
115    def __init__(self, simulator):
116        self.living_files = {}
117        self.utilization = 0 # total size of all active files
118        self.simulator = simulator
119        self.simulator.stamp_utilization(self.utilization)
120
121    def get_all_nodes(self):
122        return self.all_nodes
123
124    def please_preserve(self, fileid, size, tried, last_givento):
125        self.living_files[fileid] = (size, tried, last_givento)
126        self.utilization += size
127        self.simulator.stamp_utilization(self.utilization)
128
129    def please_delete(self, fileid):
130        self.delete(fileid)
131
132    def permute_peers(self, fileid):
133        permuted = [(sha(fileid+n.nid),n)
134                    for n in self.get_all_nodes()]
135        permuted.sort()
136        return permuted
137
138    def delete(self, fileid):
139        permuted = self.permute_peers(fileid)
140        size, tried, last_givento = self.living_files[fileid]
141        pid = ""
142        while tried and pid < last_givento:
143            pid,node = permuted.pop(0)
144            had_it = node.delete_share(fileid)
145            if had_it:
146                tried -= 1
147        self.utilization -= size
148        self.simulator.stamp_utilization(self.utilization)
149        del self.living_files[fileid]
150
151class Simulator(object):
152    NUM_NODES = 1000
153    EVENTS = ["ADDFILE", "DELFILE", "ADDNODE", "DELNODE"]
154    RATE_ADDFILE = 1.0 / 10
155    RATE_DELFILE = 1.0 / 20
156    RATE_ADDNODE = 1.0 / 3000
157    RATE_DELNODE = 1.0 / 4000
158    P_NODEAVAIL = 1.0
159
160    def __init__(self):
161        self.time = 1164783600 # small numbers of seconds since the epoch confuse rrdtool
162        self.prevstamptime = int(self.time)
163
164        ds = DataSource(ds_name='utilizationds', ds_type='GAUGE', heartbeat=1)
165        rra = RRA(cf='AVERAGE', xff=0.1, steps=1, rows=1200)
166        self.rrd = RRD("/tmp/utilization.rrd", ds=[ds], rra=[rra], start=self.time)
167        self.rrd.create()
168
169        self.introducer = q = Introducer(self)
170        self.all_nodes = [Node(randomid(), q, self)
171                          for i in range(self.NUM_NODES)]
172        q.all_nodes = self.all_nodes
173        self.next = []
174        self.schedule_events()
175        self.verbose = False
176
177        self.added_files = 0
178        self.added_data = 0
179        self.deleted_files = 0
180        self.published_files = []
181        self.failed_files = 0
182        self.lost_data_bytes = 0 # bytes deleted to make room for new shares
183
184    def stamp_utilization(self, utilization):
185        if int(self.time) > (self.prevstamptime+1):
186            self.rrd.bufferValue(self.time, utilization)
187            self.prevstamptime = int(self.time)
188
189    def write_graph(self):
190        self.rrd.update()
191        self.rrd = None
192        import gc
193        gc.collect()
194
195        def1 = graph.DataDefinition(vname="a", rrdfile='/tmp/utilization.rrd', ds_name='utilizationds')
196        area1 = graph.Area(value="a", color="#990033", legend='utilizationlegend')
197        g = graph.Graph('/tmp/utilization.png', imgformat='PNG', width=540, height=100, vertical_label='utilizationverticallabel', title='utilizationtitle', lower_limit=0)
198        g.data.append(def1)
199        g.data.append(area1)
200        g.write()
201
202    def add_file(self):
203        size = random.randrange(1000)
204        n = random.choice(self.all_nodes)
205        if self.verbose:
206            print("add_file(size=%d, from node %s)" % (size, n))
207        fileid = randomid()
208        able = n.publish_file(fileid, size)
209        if able:
210            able, tried = able
211            self.added_files += 1
212            self.added_data += size
213            self.published_files.append(tried)
214        else:
215            self.failed_files += 1
216
217    def lost_data(self, size):
218        self.lost_data_bytes += size
219
220    def delete_file(self):
221        all_nodes = self.all_nodes[:]
222        random.shuffle(all_nodes)
223        for n in all_nodes:
224            if n.delete_file():
225                self.deleted_files += 1
226                return
227        print("no files to delete")
228
229    def _add_event(self, etype):
230        rate = getattr(self, "RATE_" + etype)
231        next = self.time + random.expovariate(rate)
232        self.next.append((next, etype))
233        self.next.sort()
234
235    def schedule_events(self):
236        types = set([e[1] for e in self.next])
237        for etype in self.EVENTS:
238            if not etype in types:
239                self._add_event(etype)
240
241    def do_event(self):
242        time, etype = self.next.pop(0)
243        assert time > self.time
244        # current_time = self.time
245        self.time = time
246        self._add_event(etype)
247        if etype == "ADDFILE":
248            self.add_file()
249        elif etype == "DELFILE":
250            self.delete_file()
251        elif etype == "ADDNODE":
252            pass
253            #self.add_node()
254        elif etype == "DELNODE":
255            #self.del_node()
256            pass
257        # self.print_stats(current_time, etype)
258
259    def print_stats_header(self):
260        print("time:  added   failed   lost  avg_tried")
261
262    def print_stats(self, time, etype):
263        if not self.published_files:
264            avg_tried = "NONE"
265        else:
266            avg_tried = sum(self.published_files) / len(self.published_files)
267        print(time, etype, self.added_data, self.failed_files, self.lost_data_bytes, avg_tried, len(self.introducer.living_files), self.introducer.utilization)
268
269s = None
270
271def main():
272#    rrdtool.create("foo.rrd",
273#                   "--step 10",
274#                   "DS:files-added:DERIVE::0:1000",
275#                   "RRA:AVERAGE:1:1:1200",
276#                   )
277    global s
278    s = Simulator()
279    # s.print_stats_header()
280    for i in range(1000):
281        s.do_event()
282    print("%d files added, %d files deleted" % (s.added_files, s.deleted_files))
283    return s
284
285if __name__ == '__main__':
286    main()
Note: See TracBrowser for help on using the repository browser.