1 | # -*- python -*- |
---|
2 | |
---|
3 | |
---|
4 | """ |
---|
5 | # run this tool on a linux box in its own directory, with a file named |
---|
6 | # 'pids.txt' describing which processes to watch. It will follow CPU usage of |
---|
7 | # the given processes, and compute 1/5/15-minute moving averages for each |
---|
8 | # process. These averages can be retrieved from a foolscap connection |
---|
9 | # (published at ./watcher.furl), or through an HTTP query (using ./webport). |
---|
10 | |
---|
11 | # Each line of pids.txt describes a single process. Blank lines and ones that |
---|
12 | # begin with '#' are ignored. Each line is either "PID" or "PID NAME" (space |
---|
13 | # separated). PID is either a numeric process ID, a pathname to a file that |
---|
14 | # contains a process ID, or a pathname to a directory that contains a |
---|
15 | # twistd.pid file (which contains a process ID). NAME is an arbitrary string |
---|
16 | # that will be used to describe the process to watcher.furl subscribers, and |
---|
17 | # defaults to PID if not provided. |
---|
18 | """ |
---|
19 | |
---|
20 | # TODO: |
---|
21 | # built-in graphs on web interface |
---|
22 | |
---|
23 | |
---|
24 | import pickle, os.path, time, pprint |
---|
25 | from twisted.application import internet, service, strports |
---|
26 | from twisted.web import server, resource, http |
---|
27 | from twisted.python import log |
---|
28 | import json |
---|
29 | from foolscap import Tub, Referenceable, RemoteInterface, eventual |
---|
30 | from foolscap.schema import ListOf, TupleOf |
---|
31 | from zope.interface import implements |
---|
32 | |
---|
33 | def read_cpu_times(pid): |
---|
34 | data = open("/proc/%d/stat" % pid, "r").read() |
---|
35 | data = data.split() |
---|
36 | times = data[13:17] |
---|
37 | # the values in /proc/%d/stat are in ticks, I think. My system has |
---|
38 | # CONFIG_HZ_1000=y in /proc/config.gz but nevertheless the numbers in |
---|
39 | # 'stat' appear to be 10ms each. |
---|
40 | HZ = 100 |
---|
41 | userspace_seconds = int(times[0]) * 1.0 / HZ |
---|
42 | system_seconds = int(times[1]) * 1.0 / HZ |
---|
43 | child_userspace_seconds = int(times[2]) * 1.0 / HZ |
---|
44 | child_system_seconds = int(times[3]) * 1.0 / HZ |
---|
45 | return (userspace_seconds, system_seconds) |
---|
46 | |
---|
47 | |
---|
48 | def read_pids_txt(): |
---|
49 | processes = [] |
---|
50 | for line in open("pids.txt", "r").readlines(): |
---|
51 | line = line.strip() |
---|
52 | if not line or line[0] == "#": |
---|
53 | continue |
---|
54 | parts = line.split() |
---|
55 | pidthing = parts[0] |
---|
56 | if len(parts) > 1: |
---|
57 | name = parts[1] |
---|
58 | else: |
---|
59 | name = pidthing |
---|
60 | pid = None |
---|
61 | try: |
---|
62 | pid = int(pidthing) |
---|
63 | except ValueError: |
---|
64 | pidfile = os.path.expanduser(pidthing) |
---|
65 | if os.path.isdir(pidfile): |
---|
66 | pidfile = os.path.join(pidfile, "twistd.pid") |
---|
67 | try: |
---|
68 | pid = int(open(pidfile, "r").read().strip()) |
---|
69 | except EnvironmentError: |
---|
70 | pass |
---|
71 | if pid is not None: |
---|
72 | processes.append( (pid, name) ) |
---|
73 | return processes |
---|
74 | |
---|
75 | Averages = ListOf( TupleOf(str, float, float, float) ) |
---|
76 | class RICPUWatcherSubscriber(RemoteInterface): |
---|
77 | def averages(averages=Averages): |
---|
78 | return None |
---|
79 | |
---|
80 | class RICPUWatcher(RemoteInterface): |
---|
81 | def get_averages(): |
---|
82 | """Return a list of rows, one for each process I am watching. Each |
---|
83 | row is (name, 1-min-avg, 5-min-avg, 15-min-avg), where 'name' is a |
---|
84 | string, and the averages are floats from 0.0 to 1.0 . Each average is |
---|
85 | the percentage of the CPU that this process has used: the change in |
---|
86 | CPU time divided by the change in wallclock time. |
---|
87 | """ |
---|
88 | return Averages |
---|
89 | |
---|
90 | def subscribe(observer=RICPUWatcherSubscriber): |
---|
91 | """Arrange for the given observer to get an 'averages' message every |
---|
92 | time the averages are updated. This message will contain a single |
---|
93 | argument, the same list of tuples that get_averages() returns.""" |
---|
94 | return None |
---|
95 | |
---|
96 | class CPUWatcher(service.MultiService, resource.Resource, Referenceable): |
---|
97 | implements(RICPUWatcher) |
---|
98 | POLL_INTERVAL = 30 # seconds |
---|
99 | HISTORY_LIMIT = 15 * 60 # 15min |
---|
100 | AVERAGES = (1*60, 5*60, 15*60) # 1min, 5min, 15min |
---|
101 | |
---|
102 | def __init__(self): |
---|
103 | service.MultiService.__init__(self) |
---|
104 | resource.Resource.__init__(self) |
---|
105 | try: |
---|
106 | self.history = pickle.load(open("history.pickle", "rb")) |
---|
107 | except: |
---|
108 | self.history = {} |
---|
109 | self.current = [] |
---|
110 | self.observers = set() |
---|
111 | ts = internet.TimerService(self.POLL_INTERVAL, self.poll) |
---|
112 | ts.setServiceParent(self) |
---|
113 | |
---|
114 | def startService(self): |
---|
115 | service.MultiService.startService(self) |
---|
116 | |
---|
117 | try: |
---|
118 | desired_webport = open("webport", "r").read().strip() |
---|
119 | except EnvironmentError: |
---|
120 | desired_webport = None |
---|
121 | webport = desired_webport or "tcp:0" |
---|
122 | root = self |
---|
123 | serv = strports.service(webport, server.Site(root)) |
---|
124 | serv.setServiceParent(self) |
---|
125 | if not desired_webport: |
---|
126 | got_port = serv._port.getHost().port |
---|
127 | open("webport", "w").write("tcp:%d\n" % got_port) |
---|
128 | |
---|
129 | self.tub = Tub(certFile="watcher.pem") |
---|
130 | self.tub.setServiceParent(self) |
---|
131 | try: |
---|
132 | desired_tubport = open("tubport", "r").read().strip() |
---|
133 | except EnvironmentError: |
---|
134 | desired_tubport = None |
---|
135 | tubport = desired_tubport or "tcp:0" |
---|
136 | l = self.tub.listenOn(tubport) |
---|
137 | if not desired_tubport: |
---|
138 | got_port = l.getPortnum() |
---|
139 | open("tubport", "w").write("tcp:%d\n" % got_port) |
---|
140 | d = self.tub.setLocationAutomatically() |
---|
141 | d.addCallback(self._tub_ready) |
---|
142 | d.addErrback(log.err) |
---|
143 | |
---|
144 | def _tub_ready(self, res): |
---|
145 | self.tub.registerReference(self, furlFile="watcher.furl") |
---|
146 | |
---|
147 | |
---|
148 | def getChild(self, path, req): |
---|
149 | if path == "": |
---|
150 | return self |
---|
151 | return resource.Resource.getChild(self, path, req) |
---|
152 | |
---|
153 | def render(self, req): |
---|
154 | t = req.args.get("t", ["html"])[0] |
---|
155 | ctype = "text/plain" |
---|
156 | data = "" |
---|
157 | if t == "html": |
---|
158 | data = "# name, 1min, 5min, 15min\n" |
---|
159 | data += pprint.pformat(self.current) + "\n" |
---|
160 | elif t == "json": |
---|
161 | #data = str(self.current) + "\n" # isn't that convenient? almost. |
---|
162 | data = json.dumps(self.current, indent=True) |
---|
163 | else: |
---|
164 | req.setResponseCode(http.BAD_REQUEST) |
---|
165 | data = "Unknown t= %s\n" % t |
---|
166 | req.setHeader("content-type", ctype) |
---|
167 | return data |
---|
168 | |
---|
169 | def remote_get_averages(self): |
---|
170 | return self.current |
---|
171 | def remote_subscribe(self, observer): |
---|
172 | self.observers.add(observer) |
---|
173 | |
---|
174 | def notify(self, observer): |
---|
175 | d = observer.callRemote("averages", self.current) |
---|
176 | def _error(f): |
---|
177 | log.msg("observer error, removing them") |
---|
178 | log.msg(f) |
---|
179 | self.observers.discard(observer) |
---|
180 | d.addErrback(_error) |
---|
181 | |
---|
182 | def poll(self): |
---|
183 | max_history = self.HISTORY_LIMIT / self.POLL_INTERVAL |
---|
184 | current = [] |
---|
185 | try: |
---|
186 | processes = read_pids_txt() |
---|
187 | except: |
---|
188 | log.err() |
---|
189 | return |
---|
190 | for (pid, name) in processes: |
---|
191 | if pid not in self.history: |
---|
192 | self.history[pid] = [] |
---|
193 | now = time.time() |
---|
194 | try: |
---|
195 | (user_seconds, sys_seconds) = read_cpu_times(pid) |
---|
196 | self.history[pid].append( (now, user_seconds, sys_seconds) ) |
---|
197 | while len(self.history[pid]) > max_history+1: |
---|
198 | self.history[pid].pop(0) |
---|
199 | except: |
---|
200 | log.msg("error reading process %s (%s), ignoring" % (pid, name)) |
---|
201 | log.err() |
---|
202 | try: |
---|
203 | # Newer protocols won't work in Python 2; when it is dropped, |
---|
204 | # protocol v4 can be used (added in Python 3.4). |
---|
205 | pickle.dump(self.history, open("history.pickle.tmp", "wb"), protocol=2) |
---|
206 | os.rename("history.pickle.tmp", "history.pickle") |
---|
207 | except: |
---|
208 | pass |
---|
209 | for (pid, name) in processes: |
---|
210 | row = [name] |
---|
211 | for avg in self.AVERAGES: |
---|
212 | row.append(self._average_N(pid, avg)) |
---|
213 | current.append(tuple(row)) |
---|
214 | self.current = current |
---|
215 | print(current) |
---|
216 | for ob in self.observers: |
---|
217 | eventual.eventually(self.notify, ob) |
---|
218 | |
---|
219 | def _average_N(self, pid, seconds): |
---|
220 | num_samples = seconds / self.POLL_INTERVAL |
---|
221 | samples = self.history[pid] |
---|
222 | if len(samples) < num_samples+1: |
---|
223 | return None |
---|
224 | first = -num_samples-1 |
---|
225 | elapsed_wall = samples[-1][0] - samples[first][0] |
---|
226 | elapsed_user = samples[-1][1] - samples[first][1] |
---|
227 | elapsed_sys = samples[-1][2] - samples[first][2] |
---|
228 | if elapsed_wall == 0.0: |
---|
229 | return 0.0 |
---|
230 | return (elapsed_user+elapsed_sys) / elapsed_wall |
---|
231 | |
---|
232 | application = service.Application("cpu-watcher") |
---|
233 | CPUWatcher().setServiceParent(application) |
---|