cc1  v2.1
CC1 source code docs
 All Classes Namespaces Files Functions Variables Pages
command.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 # @COPYRIGHT_begin
3 #
4 # Copyright [2010-2014] Institute of Nuclear Physics PAN, Krakow, Poland
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 # @COPYRIGHT_end
19 ##
20 # @package src.cm.models.command
21 #
22 
23 import json
24 import threading
25 from time import sleep
26 
27 from django.db import models, transaction
28 
29 from cm.models.vm import VM
30 from cm.utils import log
31 from cm.utils.exception import CMException
32 from cm.utils.threads.vm import VMThread
33 from common import response
34 from common.states import command_states, vm_states, farm_states
35 from cm.utils import message
36 
37 
38 class Command(models.Model):
39  name = models.CharField(max_length=1000)
40  args = models.CharField(max_length=100000)
41  state = models.IntegerField()
42  response = models.CharField(max_length=100000, null=True)
43  vm = models.ForeignKey(VM)
44 
45  class Meta:
46  app_label = 'cm'
47 
48  def __unicode__(self):
49  return "%s %s" % (self.name, self.args)
50 
51  ##
52  #
53  # @returns{dict} command's data
54  # \n fields:
55  # @dictkey{id,int} id of this Command
56  # @dictkey{name,string} name of this Command
57  # @dictkey{args} args of the Command
58  # @dictkey{status} @seealso{src.common.states.available_network_states}
59  # @dictkey{response}
60  #
61  def dict(self):
62  r = {}
63  r['id'] = self.id
64  r['name'] = self.name
65  r['args'] = self.args
66  r['state'] = self.state
67  r['response'] = self.response
68  return r
69 
70  @staticmethod
71  ##
72  #
73  # @parameter{name,string} Command to add for machine @prm{vm_id}
74  # @parameter{user_id,int}
75  # @parameter{vm_id,int}
76  # @parameter{kwargs,dict} key word args for the called function
77  #
78  def add_command(name, user_id, vm_id, **kwargs):
79  cmd = Command()
80  cmd.vm_id = vm_id
81  cmd.name = name
82  cmd.args = json.dumps(kwargs)
83  cmd.state = command_states['pending']
84  cmd.response = None
85  log.debug(user_id, "Add command %s for machine %s" % (name, vm_id))
86  cmd.save()
87  return cmd
88 
89  @staticmethod
90  ##
91  #
92  # Method executes command @prm{name} on the specified VM.
93  # User with id @prm{user_id} must be the owner of that VM.
94  #
95  # @parameter{name,string} name of the function to execute
96  # @parameter{user_id,long} id of the declared VM owner
97  # @parameter{vm_id,int} id of the VM on which command needs to be executed
98  # @parameter{kwargs,dict} keyword args for the called function
99  #
100  # @raises{ctx_timeout,CMException}
101  # @raises{ctx_execute_command,CMException}
102  #
103  def execute(name, user_id, vm_id, **kwargs):
104  vm = VM.get(user_id, vm_id)
105 
106  try:
107  cmd = Command.add_command(name, user_id, vm_id, **kwargs)
108  transaction.commit()
109  log.debug(user_id, "Command state %s for machine %s" % (cmd.state, vm_id))
110 
111  dom = vm.lv_domain()
112  # sendKey(codeset, holdtime, keycodes, nkeycodes, flags)
113  dom.sendKey(0, 500, [113], 1, 0)
114 
115  retry = 3
116  retry_factor = 1.2
117  retry_time = 1
118  try:
119  while retry > 0:
120  log.debug(user_id, "Check if command %s is finished for machine %s" % (cmd.id, vm_id))
121  # conn = engine.connect()
122  # state, response = conn.execute("select state, response from command where id = %s" % cmd.id).fetchone()
123  # conn.close()
124  Command.objects.update()
125  cmd = Command.objects.get(id=cmd.id)
126  log.debug(user_id, "Checked command status: %s, %s, %s" % (cmd.state, command_states['finished'], bool(cmd.state == command_states['finished'])))
127  if cmd.state == command_states['finished']:
128  log.debug(user_id, "Response %s from machine %s" % (cmd.response, vm_id))
129  # r = response[1:-1] if response else ''
130  break
131  elif cmd.state == command_states['failed']:
132  raise CMException('ctx_' + name)
133  retry -= 1
134  retry_time *= retry_factor
135  sleep(retry_time)
136  except:
137  raise
138  finally:
139  cmd.delete()
140 
141  if retry == 0:
142  log.debug(user_id, "Command %s for machine %s - TIMEOUT" % (name, vm_id))
143  raise CMException('ctx_timeout')
144 
145  return cmd.response or ''
146  except CMException:
147  raise
148  except Exception:
149  log.exception(user_id, 'Execute command')
150  raise CMException('ctx_execute_command')
151 
152  @staticmethod
153  ##
154  #
155  # First function which must be called by VMs ctx module. It registers VM with status 'running ctx',
156  # also serves a special role when creating farms (tracking head, and worker nodes)
157  #
158  # @parameter{vm_ip,string}
159  # @parameter{args}
160  #
161  def hello(vm_ip, **args):
162  vm = VM.get_by_ip(vm_ip)
163  log.debug(vm.user_id, "Hello from vm %d ip: %s" % (vm.id, vm_ip))
164 
165  vm.ctx_api_version = args.get('version', None)
166  vm.state = vm_states['running ctx']
167 
168  if vm.ssh_username and vm.ssh_key:
169  Command.execute('add_ssh_key', vm.user_id, vm.id, user=vm.ssh_username, ssh_key=vm.ssh_key)
170 
171  if vm.is_head():
172  Command.register_head(vm)
173  elif vm.is_farm():
174  Command.register_node(vm)
175 
176  try:
177  vm.save(update_fields=['state', 'ctx_api_version'])
178  except Exception, e:
179  log.error(vm.user_id, "Cannot update database for vm %d: %s" % (vm.id, e.message))
180  return response('ctx_error', "Cannot update database: %s" % e.message)
181 
182  return 'ok'
183 
184  @staticmethod
185  ##
186  #
187  # Replaces farm name to "farm" word.
188  #
189  # @parameter{farm_name,string} farm name
190  # @parameter{vm_name,string} vm name
191  #
192  def name_to_farm(farm_name, vm_name):
193  return vm_name.replace(farm_name, 'farm')
194 
195  @staticmethod
196  ##
197  #
198  # Called from CLM when registering worker nodes of the farm
199  #
200  # @parameter{vm,vm} VM database mapper
201  #
202  def register_node(vm):
203  log.debug(vm.user_id, "machine %d: registered as worker node" % vm.id)
204 
205  # farm_counts[vm.farm_id] = farm_counts[vm.farm_id] - 1
206 
207  try:
208  hosts = vm.farm.hosts()
209  log.debug(vm.user_id, "vm: %d, host list to inject into WNs: %s" % (vm.id, str(hosts)))
210 
211  Command.execute('add_ssh_key', vm.user_id, vm.id, user=vm.ssh_username, ssh_key=vm.ssh_key)
212  Command.execute('update_hosts', vm.user_id, vm.id, hosts_list=hosts, user=vm.ssh_username)
213  Command.execute('set_hostname', vm.user_id, vm.id, hostname=vm.name.replace(vm.farm.name, 'farm'))
214 
215  except Exception:
216  log.exception(vm.user_id, 'configuring farm failed for machine %d' % vm.id)
217  raise Exception('configuring farm failed')
218  log.info(vm.user_id, 'WN %d registered' % vm.id)
219 
220  @staticmethod
221  ##
222  #
223  # Head registration process:
224  # - Creates ssh keys and sets their values for WN;
225  # - Inserts VMs into the database;
226  # - Then starts VMThreads which create actual machines.
227  #
228  # Called when registering farms head.
229  #
230  # @parameter{vm,VM} instance of the VM to be registered as head
231  #
232  def register_head(vm):
233  log.debug(vm.user_id, "machine %d: registered as head" % vm.id)
234 
235  log.debug(vm.user_id, "creating lock for machine %d in farm %d" % (vm.id, vm.farm_id))
236  # farm_counts[vm.farm_id] = len(vm.farm.vms) - 1 # substract head
237  # skip if farm is already configured - reboot head
238  if vm.is_head() == True and vm.farm.state == farm_states['running']:
239  return
240 
241  vms = []
242  if vm.farm.state == farm_states['init_head']:
243  vm.farm.state = farm_states['running']
244  vm.farm.save()
245 
246  log.info(vm.user_id, 'generating ssh keys on head %d' % vm.id)
247 
248  try:
249  r = Command.execute('generate_key', vm.user_id, vm.id)
250  r = json.loads(r)
251  log.info(vm.user_id, 'generated key: %s for machine %d' % (r, vm.id))
252  # r = self.vm_request_command(vm.user, vm.id, Command(command_names['generate_key']))
253  for wn in vm.farm.vms.all():
254  wn.ssh_username = 'root'
255  wn.ssh_key = r
256  wn.save()
257  if not wn.is_head():
258  vms.append(wn)
259  ssh_username = 'root'
260  ssh_key = r
261  log.debug(vm.user_id, 'appended %d vms to farm [id:%d]' % (vm.farm.vms.count() - 1, vm.id)) # excluding head
262 
263  Command.add_command('add_ssh_key', vm.user_id, vm.id, user=ssh_username, ssh_key=ssh_key)
264  Command.add_command('update_hosts', vm.user_id, vm.id, hosts_list=vm.farm.hosts(), user=ssh_username)
265  Command.execute('set_hostname', vm.user_id, vm.id, hostname=vm.name.replace(vm.farm.name, 'farm'))
266 
267  except Exception:
268  log.exception(vm.user_id, '')
269  vm.farm.state = farm_states['unconfigured']
270  message.error(vm.id, 'farm_create', {'id': vm.farm.id, 'name': vm.farm.name})
271  log.info(vm.user_id, 'Head %d registered' % vm.id)
272  shared = {"counter": len(vms), "lock": threading.Lock()}
273  for vm in vms:
274  thread = VMThread(vm, 'create', shared)
275  thread.start()
276  log.debug(vm.user_id, 'vm thread created [vm id:%d]' % vm.id)
277