25 from time
import sleep
27 from django.db
import models, transaction
29 from cm.models.vm
import VM
30 from cm.utils
import log
31 from cm.utils.exception
import CMException
32 from cm.utils.threads.vm
import VMThread
33 from common
import response
34 from common.states
import command_states, vm_states, farm_states
35 from cm.utils
import message
39 name = models.CharField(max_length=1000)
40 args = models.CharField(max_length=100000)
41 state = models.IntegerField()
42 response = models.CharField(max_length=100000, null=
True)
43 vm = models.ForeignKey(VM)
49 return "%s %s" % (self.
name, self.
args)
66 r[
'state'] = self.
state
82 cmd.args = json.dumps(kwargs)
83 cmd.state = command_states[
'pending']
85 log.debug(user_id,
"Add command %s for machine %s" % (name, vm_id))
104 vm = VM.get(user_id, vm_id)
107 cmd = Command.add_command(name, user_id, vm_id, **kwargs)
109 log.debug(user_id,
"Command state %s for machine %s" % (cmd.state, vm_id))
113 dom.sendKey(0, 500, [113], 1, 0)
120 log.debug(user_id,
"Check if command %s is finished for machine %s" % (cmd.id, vm_id))
124 Command.objects.update()
125 cmd = Command.objects.get(id=cmd.id)
126 log.debug(user_id,
"Checked command status: %s, %s, %s" % (cmd.state, command_states[
'finished'], bool(cmd.state == command_states[
'finished'])))
127 if cmd.state == command_states[
'finished']:
128 log.debug(user_id,
"Response %s from machine %s" % (cmd.response, vm_id))
131 elif cmd.state == command_states[
'failed']:
132 raise CMException(
'ctx_' + name)
134 retry_time *= retry_factor
142 log.debug(user_id,
"Command %s for machine %s - TIMEOUT" % (name, vm_id))
143 raise CMException(
'ctx_timeout')
145 return cmd.response
or ''
149 log.exception(user_id,
'Execute command')
150 raise CMException(
'ctx_execute_command')
162 vm = VM.get_by_ip(vm_ip)
163 log.debug(vm.user_id,
"Hello from vm %d ip: %s" % (vm.id, vm_ip))
165 vm.ctx_api_version = args.get(
'version',
None)
166 vm.state = vm_states[
'running ctx']
168 if vm.ssh_username
and vm.ssh_key:
169 Command.execute(
'add_ssh_key', vm.user_id, vm.id, user=vm.ssh_username, ssh_key=vm.ssh_key)
172 Command.register_head(vm)
174 Command.register_node(vm)
177 vm.save(update_fields=[
'state',
'ctx_api_version'])
179 log.error(vm.user_id,
"Cannot update database for vm %d: %s" % (vm.id, e.message))
180 return response(
'ctx_error',
"Cannot update database: %s" % e.message)
193 return vm_name.replace(farm_name,
'farm')
203 log.debug(vm.user_id,
"machine %d: registered as worker node" % vm.id)
208 hosts = vm.farm.hosts()
209 log.debug(vm.user_id,
"vm: %d, host list to inject into WNs: %s" % (vm.id, str(hosts)))
211 Command.execute(
'add_ssh_key', vm.user_id, vm.id, user=vm.ssh_username, ssh_key=vm.ssh_key)
212 Command.execute(
'update_hosts', vm.user_id, vm.id, hosts_list=hosts, user=vm.ssh_username)
213 Command.execute(
'set_hostname', vm.user_id, vm.id, hostname=vm.name.replace(vm.farm.name,
'farm'))
216 log.exception(vm.user_id,
'configuring farm failed for machine %d' % vm.id)
217 raise Exception(
'configuring farm failed')
218 log.info(vm.user_id,
'WN %d registered' % vm.id)
233 log.debug(vm.user_id,
"machine %d: registered as head" % vm.id)
235 log.debug(vm.user_id,
"creating lock for machine %d in farm %d" % (vm.id, vm.farm_id))
238 if vm.is_head() ==
True and vm.farm.state == farm_states[
'running']:
242 if vm.farm.state == farm_states[
'init_head']:
243 vm.farm.state = farm_states[
'running']
246 log.info(vm.user_id,
'generating ssh keys on head %d' % vm.id)
249 r = Command.execute(
'generate_key', vm.user_id, vm.id)
251 log.info(vm.user_id,
'generated key: %s for machine %d' % (r, vm.id))
253 for wn
in vm.farm.vms.all():
254 wn.ssh_username =
'root'
259 ssh_username =
'root'
261 log.debug(vm.user_id,
'appended %d vms to farm [id:%d]' % (vm.farm.vms.count() - 1, vm.id))
263 Command.add_command(
'add_ssh_key', vm.user_id, vm.id, user=ssh_username, ssh_key=ssh_key)
264 Command.add_command(
'update_hosts', vm.user_id, vm.id, hosts_list=vm.farm.hosts(), user=ssh_username)
265 Command.execute(
'set_hostname', vm.user_id, vm.id, hostname=vm.name.replace(vm.farm.name,
'farm'))
268 log.exception(vm.user_id,
'')
269 vm.farm.state = farm_states[
'unconfigured']
270 message.error(vm.id,
'farm_create', {
'id': vm.farm.id,
'name': vm.farm.name})
271 log.info(vm.user_id,
'Head %d registered' % vm.id)
272 shared = {
"counter": len(vms),
"lock": threading.Lock()}
274 thread = VMThread(vm,
'create', shared)
276 log.debug(vm.user_id,
'vm thread created [vm id:%d]' % vm.id)