cc1  v2.1
CC1 source code docs
 All Classes Namespaces Files Functions Variables Pages
monia.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 # @COPYRIGHT_begin
3 #
4 # Copyright [2010-2014] Institute of Nuclear Physics PAN, Krakow, Poland
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 # @COPYRIGHT_end
19 
20 ##
21 # @package src.cm.utils.threads.monia
22 #
23 # @author Tomek WojtoƄ
24 #
25 
26 import threading
27 import libvirt
28 import time
29 import sys
30 import re
31 import cm.utils.monia
32 from cm.models.node import Node
33 from common.states import node_states
34 from cm.utils import log
35 from cm import settings
36 
37 
38 def get_nodes():
39  nlist = [{'address':node.dict['address'], 'state':node.dict['state'], 'conn_string':node.conn_string} for node in Node.objects.filter(state__exact=node_states['ok'])]
40 
41  return nlist
42 
43 
45  if not settings.MONITOR_ENABLE:
46  return 'Monitoring disabled'
47  nlist = get_nodes()
48  if not nlist:
49  return 'No nodes to monitor'
50  e = threading.enumerate()
51  for i in e:
52  if i.name == "initiator":
53  i.update_nodes(nlist)
54  return nlist
55 
56 
57 ##
58 #
59 # Starts the system monitoring
60 # @response (list)
61 #
63  if not settings.MONITOR_ENABLE:
64  stop_monia()
65  return 'Monitoring disabled'
66  nlist = get_nodes()
67  if not nlist:
68  stop_monia()
69  return 'No nodes to monitor'
70  r = []
71  e = threading.enumerate()
72 
73  #update list of nodes in MonitorInitiator thread
74  for t in e:
75  if t.name == "initiator":
76  t.update_nodes(nlist)
77  log.info(0, 'Monitoring nodes list updated')
78  r.append('node list updated')
79 
80  #start MonitorInitiator thread...
81  if not [t for t in e if t.name == 'initiator']:
82  monitor = MonitorInitiator()
83  monitor.start()
84  r.append('initiator started')
85  log.info(0, 'Monitoring thread MonitorInitiator started')
86 
87  #start CleanerThread thread...
88  if not [t for t in e if t.name == 'cleaner']:
89  cl = CleanerThread()
90  cl.start()
91  r.append('cleaner started')
92  log.info(0, 'Monitoring thread CleanerThread started')
93 
94  #log.info(0, 'Monitoring threads %s started' % str(r))
95  return r
96 
97 
98 ##
99 #
100 # Stop the monitoring system
101 # @returns{list}
102 #
104 
105  t = threading.activeCount()
106  e = threading.enumerate()
107  th = []
108  for i in e:
109  th.append(i.getName())
110  if i.getName() == "initiator":
111  i.kill()
112  if i.getName() == "cleaner":
113  i.kill()
114  log.info(0, 'Monitoring threads stopped')
115  return [str(t), str(th)]
116 
117 
118 class MonitorInitiator(threading.Thread):
119  def __init__(self):
120  threading.Thread.__init__(self)
121  self.setDaemon(True)
122  self.name = "initiator"
123  self.running = True
124  if not cm.utils.monia.os.path.exists(settings.PATH_TO_RRD):
125  cm.utils.monia.os.makedirs(settings.PATH_TO_RRD)
126  if not cm.utils.monia.os.path.exists(settings.BACKUP_PATH):
127  cm.utils.monia.os.makedirs(settings.BACKUP_PATH)
128 
130 
131  nlist = get_nodes()
132  self.frequency = settings.PERIOD * 1.0 / len(nlist)
133  for n in nlist:
134  self.rb.add(n)
135  #self.start()
136 
137  def update_nodes(self, nlist):
138  log.info(0, 'updating nodes list')
139  self.rb.clear()
140  for n in nlist:
141  self.rb.add(n)
142 
143  def run(self):
144  while self.running:
145  st1 = time.time()
146  try:
147  one = self.rb.get()
148  threads = threading.enumerate()
149  if not one['address'] in [i.name for i in threads]:
150  t = MonitorThread(one)
151  t.start()
152  except Exception, e:
153  log.error(0, 'Monitoring error %s: %s' % (one['address'], e))
154  st2 = time.time() - st1
155  if st2 < self.frequency:
156  time.sleep(self.frequency - st2)
157  else:
158  log.info(0, "mon_time %f %s threads: %d" % (st2, one['address'], len(threads)))
159  log.info(0, "MonitorInitiator stopped")
160 
161  def kill(self):
162  log.info(0, "stopping MonitorInitiator... ")
163  self.running = False
164  # sys.exit()
165 
166 
167 class CleanerThread(threading.Thread):
168  def __init__(self):
169  threading.Thread.__init__(self)
170  self.setDaemon(True)
171  self.name = "cleaner"
172  self.running = True
173 
174  def kill(self):
175  log.info(0, "stopping CleanerThread... ")
176  self.running = False
177  # sys.exit()
178 
179  def run(self):
180  try:
181  while self.running:
182  time.sleep(settings.CLEANING_PERIOD)
184  for vm in rrds:
185  if time.time() - settings.TIME_TO_REMOVE > rrds[vm][1]:
186  cm.utils.monia.RrdHandler({'name': str(vm), 'data': None}).remove()
187  log.info(0, "CleanerThread stopped")
188  except Exception, e:
189  log.exception(0, 'CleanerThread: %s'%(e))
190 
191 
192 class MonitorThread(threading.Thread):
193  def __init__(self, data):
194  threading.Thread.__init__(self)
195  self.addr = data['conn_string']
196  self.name = data['address']
197  self.setDaemon(True)
198  self.t = threading.Timer(settings.TIMEOUT, self.kill)
199  self.t.name = 'timer-%s' % (self.name)
200  self.t.start()
201 
202  def run(self):
203  self.update()
204  #log.debug(0, 'Checking node: %s'%(self.getName()))
205  self.t.cancel()
206 
207  def update(self):
208  r = self.read_node()
209  if not r:
210  return r
211  vm_list = r[4]
212  if vm_list:
213  for vm in vm_list:
215 
216  def read_node(self):
217  used_cpu = 0
218  used_memory = 0
219  try:
220  self.c = libvirt.openReadOnly(self.addr)
221  total_cpu = self.c.getInfo()[2]
222  total_memory = self.c.getInfo()[1]
223  except Exception, e:
224  log.error(0, 'libvirt getting info: %s' % (e))
225  return None
226  vms = []
227 
228  try:
229  domains = self.c.listDomainsID()
230  except Exception, e:
231  log.exception(0, 'libvirt listDomainsID: %s' % (str(e)))
232  return None
233 
234  for domain_id in domains:
235  try:
236  hostname = self.c.getHostname()
237  dom = self.c.lookupByID(domain_id)
238  info = dom.info() # struct virDomainInfo
239  used_cpu += info[3]
240  used_memory += info[1]
241  #self.xml_data = parseString(dom.XMLDesc(0))
242  self.xml_data = dom.XMLDesc(0)
243  #xml_nodes = self.xml_data.childNodes
244  devs = re.search('<devices>(.*?)</devices>', self.xml_data, re.S)
245  try:
246  #hdd = xml_nodes[0].getElementsByTagName("devices")[0].getElementsByTagName("disk")[0].getElementsByTagName("target")[0].getAttribute("dev")
247  #hdd_stat = dom.blockStats(hdd)
248  hdd = re.findall('<disk.*?<target dev=\'(.*?)\'.*?</disk>', devs.group(), re.S)
249  hdd_stat = dom.blockStats(hdd[0])
250  except Exception:
251  hdd_stat = [0,0,0,0,0,0,0,0,0,0,0,0,0]
252  try:
253  #net = xml_nodes[0].getElementsByTagName("devices")[0].getElementsByTagName("interface")[0].getElementsByTagName("target")[0].getAttribute("dev")
254  #net_stat = dom.interfaceStats(net)
255  net = re.findall('<interface.*?<target dev=\'(.*?)\'.*?</interface>', devs.group(), re.S)
256  net_stat = dom.interfaceStats(net[0])
257  except Exception:
258  net_stat = [0,0,0,0,0,0,0,0,0,0,0,0,0]
259 
260  vms.append({'name': dom.name(),
261  'id': domain_id,
262  'state': info[0],
263  'cpu_time': info[4],
264  'cpu_count': info[3],
265  'memory': info[2],
266  'rd_req': hdd_stat[0],
267  'rd_bytes': hdd_stat[1],
268  'wr_req': hdd_stat[2],
269  'wr_bytes': hdd_stat[3],
270  'rx_bytes': net_stat[0],
271  'rx_packets': net_stat[1],
272  'tx_bytes': net_stat[4],
273  'tx_packets': net_stat[5]
274  })
275  except Exception, e:
276  log.exception(0, 'libvirt lookup (%s id=%d): %s' % (hostname, domain_id, str(e)))
277  return None
278 
279  dom = None
280  g = self.c.close()
281  if g != 0:
282  log.error(0, 'libvirt close error %s' % (str(g)))
283  self.lv_data = [used_cpu, used_memory, total_cpu, total_memory, vms]
284  return self.lv_data
285 
286  def kill(self):
287  log.info(0, 'killing MonitorThread %s...' % (self.name))
288  try:
289  sys.exit()
290  except Exception:
291  log.info(0, 'monitorThread %s error...' % (self.name))
292  log.info(0, 'MonitorThread killed')
293