Files
PushBlendPull/worker.py

481 lines
16 KiB
Python

import os
import re
import time
import json
import logging
from queue import Queue, Empty
from threading import Thread, Event
from subprocess import Popen, PIPE, STDOUT
from twisted.internet import reactor
from twisted.web import server, resource, http
logging.basicConfig(level=logging.INFO)
class Index(resource.Resource):
isLeaf = False
def getChild(self, path, request):
if path == b'':
return self
return resource.Resource.getChild(self, path, request)
def render_GET(self, request):
return get_index_content()
class StatusResource(resource.Resource):
isLeaf = True
def __init__(self):
self.subscriptions = []
self.last_status = ('idle',)
def response_callback(self, err, request):
request.finish
logging.info('Request Handler: connection either resulted in error or client closed')
self.subscriptions.remove(request)
def add_event_stream_headers(self, request):
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
request.setHeader("Access-Control-Allow-Origin", "*")
return request
def add_json_headers(self, request):
request.setHeader('Content-Type', 'application/json; charset=utf-8')
return request
def publish(self, status):
self.last_status = status
for req in self.subscriptions:
self.push_sse_mesage(req=req, msg=status)
def push_sse_mesage(self, req, msg):
event_line = "data: {}\r\n".format(json.dumps(msg)) + '\r\n'
req.write(event_line.encode())
def render_GET(self, request):
accept_header = request.requestHeaders.getRawHeaders(b'accept')
if b'text/event-stream' in accept_header:
request = self.add_event_stream_headers(request) # 2. Format Headers
request.write("")
request.notifyFinish().addBoth(self.response_callback, request)
self.subscriptions.append(request)
logging.info('Request Handler: new subscriber ({})'.format(len(self.subscriptions)))
return server.NOT_DONE_YET
else:
return json.dumps(self.last_status).encode()
class JobResource(resource.Resource):
isLeaf = True
PARAM_BLEND_FILE = b'blend_file'
PARAM_SCENE = b'scene'
PARAM_START_FRAME = b'start_frame'
PARAM_END_FRAME = b'end_frame'
def __init__(self, worker):
self.worker = worker
@classmethod
def get_param_blend_file(cls, request):
if cls.PARAM_BLEND_FILE not in request.args or not request.args[cls.PARAM_BLEND_FILE][0].endswith(b'.blend'):
raise ValueError
blend_file = request.args[cls.PARAM_BLEND_FILE][0].decode("utf-8")
if not os.path.isfile(blend_file):
raise FileNotFoundError
return blend_file
@classmethod
def get_param_scene(cls, request):
if cls.PARAM_SCENE not in request.args or request.args[cls.PARAM_SCENE][0] == b'':
return None
return request.args[cls.PARAM_SCENE][0].decode("utf-8")
@classmethod
def get_param_start_frame(cls, request):
if cls.PARAM_START_FRAME not in request.args or request.args[cls.PARAM_START_FRAME][0] == b'':
return None
start_frame = request.args[cls.PARAM_START_FRAME][0]
if not start_frame.isdigit():
raise ValueError
return int(start_frame)
@classmethod
def get_param_end_frame(cls, request):
if cls.PARAM_END_FRAME not in request.args or request.args[cls.PARAM_END_FRAME][0] == b'':
return None
end_frame = request.args[cls.PARAM_END_FRAME][0]
if not end_frame.isdigit():
raise ValueError
return int(end_frame)
def prepare_job(self, request):
return Job(
self.get_param_blend_file(request),
scene=self.get_param_scene(request),
start_frame=self.get_param_start_frame(request),
end_frame=self.get_param_end_frame(request)
)
def render_POST(self, request):
try:
job = self.prepare_job(request)
logging.info('Request Handler: received a job request: {}'.format(job))
self.worker.enqueue_job(job)
return b'OK'
except ValueError:
return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
except FileNotFoundError:
return resource.NoResource("Blend file not found")
def render_GET(self, request):
return json.dumps({
"current_job": str(self.worker.get_current_job()),
"queue": [str(job) for job in self.worker.get_job_queue()]
}).encode()
class CancelResource(resource.Resource):
isLeaf = True
PARAM_TYPE = b'type'
PARAM_JOBS = b'jobs'
CANCEL_TYPE_QUEUED_JOBS = b'queued_jobs'
CANCEL_TYPE_QUEUE = b'queue'
CANCEL_TYPE_CURRENT_JOB = b'current_job'
CANCEL_TYPE_ALL = b'all'
def __init__(self, worker):
self.worker = worker
@classmethod
def get_param_type(cls, request):
if cls.PARAM_TYPE not in request.args:
raise ValueError
param_type = request.args[cls.PARAM_TYPE][0]
available_type = [cls.CANCEL_TYPE_QUEUED_JOBS, cls.CANCEL_TYPE_QUEUE,
cls.CANCEL_TYPE_CURRENT_JOB, cls.CANCEL_TYPE_ALL]
if param_type not in available_type:
raise ValueError
return param_type
@classmethod
def get_param_jobs(cls, request):
if cls.PARAM_JOBS not in request.args:
raise ValueError
jobs = request.args[cls.PARAM_JOBS]
return [job.decode("utf-8") for job in jobs]
def render_POST(self, request):
logging.info('Request Handler: received cancel request')
try:
type_param = self.get_param_type(request)
if type_param == self.CANCEL_TYPE_QUEUED_JOBS:
self.worker.remove_job_list_from_queue(self.get_param_jobs(request))
elif type_param == self.CANCEL_TYPE_QUEUE:
self.worker.clear_job_queue()
elif type_param == self.CANCEL_TYPE_CURRENT_JOB:
self.worker.cancel_current_job()
elif type_param == self.CANCEL_TYPE_ALL:
self.worker.clear_job_queue()
self.worker.cancel_current_job()
except ValueError:
return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
return b'OK'
def main():
host = ''
port = 8025
root = Index()
status_output = StatusResource()
worker = QueueConsumer(status_output)
root.putChild(b'status', status_output)
root.putChild(b'job', JobResource(worker))
root.putChild(b'cancel', CancelResource(worker))
site = server.Site(root)
reactor.listenTCP(port, site)
worker.start_worker()
reactor.addSystemEventTrigger("before", "shutdown", worker.stop_worker)
logging.info('Starting server {}:{}'.format(host, port))
reactor.run()
class QueueConsumer:
def __init__(self, status_output):
self._job_queue = EditableQueue()
self._stop_working = Event()
self._cancel_job = Event()
self._status_output = status_output
self._last_output_status = ''
self._current_job = None
def start_worker(self):
self._stop_working.clear()
worker_thread = Thread(target=self._consume_queue)
worker_thread.start()
def stop_worker(self):
self._stop_working.set()
def enqueue_job(self, job):
self._job_queue.put(job)
def get_job_queue(self):
return self._job_queue.as_list()
def get_current_job(self):
return self._current_job
def cancel_current_job(self):
self._cancel_job.set()
def remove_job_list_from_queue(self, job_list):
removed_jobs = self._job_queue.remove_sublist(job_list)
for job in removed_jobs:
self._status_output.publish((str(job), Job.STATUS_CANCELED,))
def clear_job_queue(self):
removed_jobs = self._job_queue.clear()
for job in removed_jobs:
self._status_output.publish((str(job), Job.STATUS_CANCELED,))
def _consume_queue(self):
while not self._stop_working.is_set():
try:
job = self._job_queue.get(timeout=1)
except Empty:
continue
self._current_job = job
self._execute_job()
self._current_job = None
def _execute_job(self):
logging.info('Worker: starting a new job: {}'.format(self._current_job))
start_time = time.time()
p = Popen(["/usr/bin/blender"] + self._current_job.get_blender_args(),
stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1)
while not self._cancel_job.is_set():
line = p.stdout.readline()
if line == '':
if p.poll() is not None:
logging.info('Worker: job finished: {}'.format(self._current_job))
self._status_output.publish((str(self._current_job), Job.STATUS_DONE, time.time() - start_time,))
break
else:
status = self._parse_stdout(line)
if status is not None:
logging.info('Worker: job status is: {}'.format(status))
self._status_output.publish((str(self._current_job), status,))
if self._cancel_job.is_set():
logging.info('Worker: canceling: {}'.format(self._current_job))
p.kill()
self._cancel_job.clear()
self._status_output.publish((str(self._current_job), Job.STATUS_CANCELED,))
def _parse_stdout(self, line):
logging.debug('Worker: getting output: {}'.format(line))
regex_blend_status = "Fra:\d+"
t = re.search(regex_blend_status, line)
if t is not None and t[0] != self._last_output_status:
self._last_output_status = t[0]
return t[0]
else:
return None
class EditableQueue(Queue):
def remove_sublist(self, item_list):
with self.not_empty:
removed_items = []
for item in item_list:
try:
self.queue.remove(item)
except ValueError:
pass
else:
removed_items.append(item)
return removed_items
def clear(self):
with self.not_empty:
removed_items = list(self.queue)
self.queue.clear()
return removed_items
def as_list(self):
with self.not_empty:
return list(self.queue)
class Job:
STATUS_DONE = 'done'
STATUS_CANCELED = 'canceled'
def __init__(self, blend_file, **kwargs):
self.blend_file = blend_file
self.scene = kwargs["scene"] if "scene" in kwargs else None
self.start_frame = kwargs["start_frame"] if "start_frame" in kwargs else None
self.end_frame = kwargs["end_frame"] if "end_frame" in kwargs else None
def __str__(self):
return "{}>{} {}:{}".format(
self.blend_file,
self.scene if self.scene is not None else "default_scene",
self.start_frame if self.start_frame is not None else "start",
self.end_frame if self.end_frame is not None else "end"
)
def get_blender_args(self):
args = ['--background', self.blend_file]
if self.scene is not None:
args.append('--scene')
args.append(self.scene)
if self.start_frame is not None:
args.append('--frame-start')
args.append(str(self.start_frame))
if self.end_frame is not None:
args.append('--frame-end')
args.append(str(self.end_frame))
args.append('--render-anim')
return args
def __eq__(self, other):
if isinstance(other, str):
return other == str(self)
return other == self.blend_file and other.scene == self.scene \
and other.start_frame == self.start_frame \
and other.end_frame == self.end_frame
def get_index_content():
return '''<html>
<head>
</head>
<body>
<input type="submit" value="track_status" onclick="return start_event();" />
<ul id="events"></ul>
<form action="/job" method="post" target="_blank" onsubmit="return submit_job()" id="form_job">
<input type="radio" name="blend_file" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend" checked>pitou<br/>
<input type="radio" name="blend_file" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/thom.blend">thom<br/>
scene<input type="text" name="scene" value="pitou"><br/>
start_frame<input type="text" name="start_frame" value="1"><br/>
end_frame<input type="text" name="end_frame" value="50"><br/>
<input type="submit" value="send_job"/>
</form>
<form action="/cancel" method="post" target="_blank" onsubmit="return submit_cancel()" id="form_cancel">
<input type="text" name="toto[tata]
<input type="radio" name="type" value="queued_jobs" id="queued_jobs">queued_jobs<br/>
<input type="radio" name="type" value="queue">queue<br/>
<input type="radio" name="type" value="current_job" checked>current_job<br/>
<input type="radio" name="type" value="all">all<br/>
<select multiple name="jobs" id="jobs"></select>
<input type="submit" value="cancel"/>
</form>
<script type="text/javascript">
var listening = false;
function submit_job() {
submit_ajax(
document.getElementById("form_job"),
function() {start_event();}
);
return false;
}
function submit_cancel() {
submit_ajax(
document.getElementById("form_cancel"),
function() {update_queue();}
);
return false;
}
function submit_ajax(form, callback) {
var request = new XMLHttpRequest();
request.onreadystatechange = function() {
if (request.readyState == XMLHttpRequest.DONE && request.status == 200) {
callback();
}
};
request.open("POST", form.action);
request.send(new FormData(form));
update_queue()
}
function start_event() {
if (!listening) {
var eventList = document.getElementById("events");
while (eventList.firstChild) {
eventList.removeChild(eventList.firstChild);
}
var evtSource = new EventSource("/status");
evtSource.onmessage = function(e) {
var newElement = document.createElement("li");
newElement.innerHTML = "message: " + e.data;
eventList.appendChild(newElement);
if (eventList.childNodes.length > 5) {
eventList.removeChild(eventList.firstChild);
}
}
listening = true;
}
}
function update_queue() {
var request = new XMLHttpRequest();
request.onreadystatechange = function() {
if (request.readyState == XMLHttpRequest.DONE && request.status == 200) {
var response = JSON.parse(request.responseText);
var select = document.getElementById("jobs");
while (select.firstChild) {
select.removeChild(select.firstChild);
}
response.queue.forEach(function(job) {
var newElement = document.createElement("option");
newElement.text = job;
newElement.value = job;
select.appendChild(newElement);
});
}
};
request.open('GET', "/job", true);
request.send(null);
}
document.addEventListener("DOMContentLoaded", function(event) {
document.getElementById("queued_jobs").addEventListener("click", update_queue, false);
}, false);
</script>
</body>
</html>'''.encode()
if __name__ == "__main__":
main()