1 | #! /usr/bin/env python |
---|
2 | |
---|
3 | |
---|
4 | import hashlib |
---|
5 | import os, random |
---|
6 | |
---|
7 | from pkg_resources import require |
---|
8 | require('PyRRD') |
---|
9 | from pyrrd import graph |
---|
10 | from pyrrd.rrd import DataSource, RRD, RRA |
---|
11 | |
---|
12 | |
---|
13 | def sha(s): |
---|
14 | return hashlib.sha1(s).digest() |
---|
15 | |
---|
16 | def randomid(): |
---|
17 | return os.urandom(20) |
---|
18 | |
---|
19 | class Node(object): |
---|
20 | def __init__(self, nid, introducer, simulator): |
---|
21 | self.nid = nid |
---|
22 | self.introducer = introducer |
---|
23 | self.simulator = simulator |
---|
24 | self.shares = {} |
---|
25 | self.capacity = random.randrange(1000) |
---|
26 | self.utilization = 0 |
---|
27 | self.files = [] |
---|
28 | |
---|
29 | def permute_peers(self, fileid): |
---|
30 | permuted = [(sha(fileid+n.nid),n) |
---|
31 | for n in self.introducer.get_all_nodes()] |
---|
32 | permuted.sort() |
---|
33 | return permuted |
---|
34 | |
---|
35 | def publish_file(self, fileid, size, numshares=100): |
---|
36 | sharesize = 4 * size / numshares |
---|
37 | permuted = self.permute_peers(fileid) |
---|
38 | last_givento = None |
---|
39 | tried = 0 |
---|
40 | givento = [] |
---|
41 | while numshares and permuted: |
---|
42 | pid,node = permuted.pop(0) |
---|
43 | tried += 1 |
---|
44 | last_givento = pid |
---|
45 | if node.accept_share(fileid, sharesize): |
---|
46 | givento.append((pid,node)) |
---|
47 | numshares -= 1 |
---|
48 | if numshares: |
---|
49 | # couldn't push, should delete |
---|
50 | for pid,node in givento: |
---|
51 | node.delete_share(fileid) |
---|
52 | return False |
---|
53 | self.files.append((fileid, numshares)) |
---|
54 | self.introducer.please_preserve(fileid, size, tried, last_givento) |
---|
55 | return (True, tried) |
---|
56 | |
---|
57 | def accept_share(self, fileid, sharesize): |
---|
58 | if self.utilization < self.capacity: |
---|
59 | # we have room! yay! |
---|
60 | self.shares[fileid] = sharesize |
---|
61 | self.utilization += sharesize |
---|
62 | return True |
---|
63 | if self.decide(sharesize): |
---|
64 | # we don't, but we'll make room |
---|
65 | self.make_space(sharesize) |
---|
66 | self.shares[fileid] = sharesize |
---|
67 | self.utilization += sharesize |
---|
68 | return True |
---|
69 | else: |
---|
70 | # we're full, try elsewhere |
---|
71 | return False |
---|
72 | |
---|
73 | def decide(self, sharesize): |
---|
74 | return False |
---|
75 | |
---|
76 | def make_space(self, sharesize): |
---|
77 | assert sharesize <= self.capacity |
---|
78 | while self.capacity - self.utilization < sharesize: |
---|
79 | victim = random.choice(self.shares.keys()) |
---|
80 | self.simulator.lost_data(self.shares[victim]) |
---|
81 | self.delete_share(victim) |
---|
82 | |
---|
83 | def delete_share(self, fileid): |
---|
84 | if fileid in self.shares: |
---|
85 | self.utilization -= self.shares[fileid] |
---|
86 | del self.shares[fileid] |
---|
87 | return True |
---|
88 | return False |
---|
89 | |
---|
90 | def retrieve_file(self): |
---|
91 | if not self.files: |
---|
92 | return |
---|
93 | fileid,numshares = random.choice(self.files) |
---|
94 | needed = numshares / 4 |
---|
95 | peers = [] |
---|
96 | for pid,node in self.permute_peers(fileid): |
---|
97 | if random.random() > self.simulator.P_NODEAVAIL: |
---|
98 | continue # node isn't available right now |
---|
99 | if node.has_share(fileid): |
---|
100 | peers.append(node) |
---|
101 | if len(peers) >= needed: |
---|
102 | return True |
---|
103 | return False |
---|
104 | |
---|
105 | def delete_file(self): |
---|
106 | if not self.files: |
---|
107 | return False |
---|
108 | which = random.choice(self.files) |
---|
109 | self.files.remove(which) |
---|
110 | fileid,numshares = which |
---|
111 | self.introducer.delete(fileid) |
---|
112 | return True |
---|
113 | |
---|
114 | class Introducer(object): |
---|
115 | def __init__(self, simulator): |
---|
116 | self.living_files = {} |
---|
117 | self.utilization = 0 # total size of all active files |
---|
118 | self.simulator = simulator |
---|
119 | self.simulator.stamp_utilization(self.utilization) |
---|
120 | |
---|
121 | def get_all_nodes(self): |
---|
122 | return self.all_nodes |
---|
123 | |
---|
124 | def please_preserve(self, fileid, size, tried, last_givento): |
---|
125 | self.living_files[fileid] = (size, tried, last_givento) |
---|
126 | self.utilization += size |
---|
127 | self.simulator.stamp_utilization(self.utilization) |
---|
128 | |
---|
129 | def please_delete(self, fileid): |
---|
130 | self.delete(fileid) |
---|
131 | |
---|
132 | def permute_peers(self, fileid): |
---|
133 | permuted = [(sha(fileid+n.nid),n) |
---|
134 | for n in self.get_all_nodes()] |
---|
135 | permuted.sort() |
---|
136 | return permuted |
---|
137 | |
---|
138 | def delete(self, fileid): |
---|
139 | permuted = self.permute_peers(fileid) |
---|
140 | size, tried, last_givento = self.living_files[fileid] |
---|
141 | pid = "" |
---|
142 | while tried and pid < last_givento: |
---|
143 | pid,node = permuted.pop(0) |
---|
144 | had_it = node.delete_share(fileid) |
---|
145 | if had_it: |
---|
146 | tried -= 1 |
---|
147 | self.utilization -= size |
---|
148 | self.simulator.stamp_utilization(self.utilization) |
---|
149 | del self.living_files[fileid] |
---|
150 | |
---|
151 | class Simulator(object): |
---|
152 | NUM_NODES = 1000 |
---|
153 | EVENTS = ["ADDFILE", "DELFILE", "ADDNODE", "DELNODE"] |
---|
154 | RATE_ADDFILE = 1.0 / 10 |
---|
155 | RATE_DELFILE = 1.0 / 20 |
---|
156 | RATE_ADDNODE = 1.0 / 3000 |
---|
157 | RATE_DELNODE = 1.0 / 4000 |
---|
158 | P_NODEAVAIL = 1.0 |
---|
159 | |
---|
160 | def __init__(self): |
---|
161 | self.time = 1164783600 # small numbers of seconds since the epoch confuse rrdtool |
---|
162 | self.prevstamptime = int(self.time) |
---|
163 | |
---|
164 | ds = DataSource(ds_name='utilizationds', ds_type='GAUGE', heartbeat=1) |
---|
165 | rra = RRA(cf='AVERAGE', xff=0.1, steps=1, rows=1200) |
---|
166 | self.rrd = RRD("/tmp/utilization.rrd", ds=[ds], rra=[rra], start=self.time) |
---|
167 | self.rrd.create() |
---|
168 | |
---|
169 | self.introducer = q = Introducer(self) |
---|
170 | self.all_nodes = [Node(randomid(), q, self) |
---|
171 | for i in range(self.NUM_NODES)] |
---|
172 | q.all_nodes = self.all_nodes |
---|
173 | self.next = [] |
---|
174 | self.schedule_events() |
---|
175 | self.verbose = False |
---|
176 | |
---|
177 | self.added_files = 0 |
---|
178 | self.added_data = 0 |
---|
179 | self.deleted_files = 0 |
---|
180 | self.published_files = [] |
---|
181 | self.failed_files = 0 |
---|
182 | self.lost_data_bytes = 0 # bytes deleted to make room for new shares |
---|
183 | |
---|
184 | def stamp_utilization(self, utilization): |
---|
185 | if int(self.time) > (self.prevstamptime+1): |
---|
186 | self.rrd.bufferValue(self.time, utilization) |
---|
187 | self.prevstamptime = int(self.time) |
---|
188 | |
---|
189 | def write_graph(self): |
---|
190 | self.rrd.update() |
---|
191 | self.rrd = None |
---|
192 | import gc |
---|
193 | gc.collect() |
---|
194 | |
---|
195 | def1 = graph.DataDefinition(vname="a", rrdfile='/tmp/utilization.rrd', ds_name='utilizationds') |
---|
196 | area1 = graph.Area(value="a", color="#990033", legend='utilizationlegend') |
---|
197 | g = graph.Graph('/tmp/utilization.png', imgformat='PNG', width=540, height=100, vertical_label='utilizationverticallabel', title='utilizationtitle', lower_limit=0) |
---|
198 | g.data.append(def1) |
---|
199 | g.data.append(area1) |
---|
200 | g.write() |
---|
201 | |
---|
202 | def add_file(self): |
---|
203 | size = random.randrange(1000) |
---|
204 | n = random.choice(self.all_nodes) |
---|
205 | if self.verbose: |
---|
206 | print("add_file(size=%d, from node %s)" % (size, n)) |
---|
207 | fileid = randomid() |
---|
208 | able = n.publish_file(fileid, size) |
---|
209 | if able: |
---|
210 | able, tried = able |
---|
211 | self.added_files += 1 |
---|
212 | self.added_data += size |
---|
213 | self.published_files.append(tried) |
---|
214 | else: |
---|
215 | self.failed_files += 1 |
---|
216 | |
---|
217 | def lost_data(self, size): |
---|
218 | self.lost_data_bytes += size |
---|
219 | |
---|
220 | def delete_file(self): |
---|
221 | all_nodes = self.all_nodes[:] |
---|
222 | random.shuffle(all_nodes) |
---|
223 | for n in all_nodes: |
---|
224 | if n.delete_file(): |
---|
225 | self.deleted_files += 1 |
---|
226 | return |
---|
227 | print("no files to delete") |
---|
228 | |
---|
229 | def _add_event(self, etype): |
---|
230 | rate = getattr(self, "RATE_" + etype) |
---|
231 | next = self.time + random.expovariate(rate) |
---|
232 | self.next.append((next, etype)) |
---|
233 | self.next.sort() |
---|
234 | |
---|
235 | def schedule_events(self): |
---|
236 | types = set([e[1] for e in self.next]) |
---|
237 | for etype in self.EVENTS: |
---|
238 | if not etype in types: |
---|
239 | self._add_event(etype) |
---|
240 | |
---|
241 | def do_event(self): |
---|
242 | time, etype = self.next.pop(0) |
---|
243 | assert time > self.time |
---|
244 | # current_time = self.time |
---|
245 | self.time = time |
---|
246 | self._add_event(etype) |
---|
247 | if etype == "ADDFILE": |
---|
248 | self.add_file() |
---|
249 | elif etype == "DELFILE": |
---|
250 | self.delete_file() |
---|
251 | elif etype == "ADDNODE": |
---|
252 | pass |
---|
253 | #self.add_node() |
---|
254 | elif etype == "DELNODE": |
---|
255 | #self.del_node() |
---|
256 | pass |
---|
257 | # self.print_stats(current_time, etype) |
---|
258 | |
---|
259 | def print_stats_header(self): |
---|
260 | print("time: added failed lost avg_tried") |
---|
261 | |
---|
262 | def print_stats(self, time, etype): |
---|
263 | if not self.published_files: |
---|
264 | avg_tried = "NONE" |
---|
265 | else: |
---|
266 | avg_tried = sum(self.published_files) / len(self.published_files) |
---|
267 | print(time, etype, self.added_data, self.failed_files, self.lost_data_bytes, avg_tried, len(self.introducer.living_files), self.introducer.utilization) |
---|
268 | |
---|
269 | s = None |
---|
270 | |
---|
271 | def main(): |
---|
272 | # rrdtool.create("foo.rrd", |
---|
273 | # "--step 10", |
---|
274 | # "DS:files-added:DERIVE::0:1000", |
---|
275 | # "RRA:AVERAGE:1:1:1200", |
---|
276 | # ) |
---|
277 | global s |
---|
278 | s = Simulator() |
---|
279 | # s.print_stats_header() |
---|
280 | for i in range(1000): |
---|
281 | s.do_event() |
---|
282 | print("%d files added, %d files deleted" % (s.added_files, s.deleted_files)) |
---|
283 | return s |
---|
284 | |
---|
285 | if __name__ == '__main__': |
---|
286 | main() |
---|