481 lines
16 KiB
Python
481 lines
16 KiB
Python
|
|
import os
|
|
import re
|
|
import time
|
|
import json
|
|
import logging
|
|
|
|
from queue import Queue, Empty
|
|
from threading import Thread, Event
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
|
|
from twisted.internet import reactor
|
|
from twisted.web import server, resource, http
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
class Index(resource.Resource):
|
|
isLeaf = False
|
|
|
|
def getChild(self, path, request):
|
|
if path == b'':
|
|
return self
|
|
return resource.Resource.getChild(self, path, request)
|
|
|
|
def render_GET(self, request):
|
|
return get_index_content()
|
|
|
|
|
|
class StatusResource(resource.Resource):
|
|
isLeaf = True
|
|
|
|
def __init__(self):
|
|
self.subscriptions = []
|
|
self.last_status = ('idle',)
|
|
|
|
def response_callback(self, err, request):
|
|
request.finish
|
|
logging.info('Request Handler: connection either resulted in error or client closed')
|
|
self.subscriptions.remove(request)
|
|
|
|
def add_event_stream_headers(self, request):
|
|
request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
|
|
request.setHeader("Access-Control-Allow-Origin", "*")
|
|
return request
|
|
|
|
def add_json_headers(self, request):
|
|
request.setHeader('Content-Type', 'application/json; charset=utf-8')
|
|
return request
|
|
|
|
def publish(self, status):
|
|
self.last_status = status
|
|
for req in self.subscriptions:
|
|
self.push_sse_mesage(req=req, msg=status)
|
|
|
|
def push_sse_mesage(self, req, msg):
|
|
event_line = "data: {}\r\n".format(json.dumps(msg)) + '\r\n'
|
|
req.write(event_line.encode())
|
|
|
|
def render_GET(self, request):
|
|
accept_header = request.requestHeaders.getRawHeaders(b'accept')
|
|
if b'text/event-stream' in accept_header:
|
|
request = self.add_event_stream_headers(request) # 2. Format Headers
|
|
request.write("")
|
|
request.notifyFinish().addBoth(self.response_callback, request)
|
|
self.subscriptions.append(request)
|
|
logging.info('Request Handler: new subscriber ({})'.format(len(self.subscriptions)))
|
|
|
|
return server.NOT_DONE_YET
|
|
else:
|
|
return json.dumps(self.last_status).encode()
|
|
|
|
|
|
class JobResource(resource.Resource):
|
|
isLeaf = True
|
|
PARAM_BLEND_FILE = b'blend_file'
|
|
PARAM_SCENE = b'scene'
|
|
PARAM_START_FRAME = b'start_frame'
|
|
PARAM_END_FRAME = b'end_frame'
|
|
|
|
def __init__(self, worker):
|
|
self.worker = worker
|
|
|
|
@classmethod
|
|
def get_param_blend_file(cls, request):
|
|
if cls.PARAM_BLEND_FILE not in request.args or not request.args[cls.PARAM_BLEND_FILE][0].endswith(b'.blend'):
|
|
raise ValueError
|
|
|
|
blend_file = request.args[cls.PARAM_BLEND_FILE][0].decode("utf-8")
|
|
if not os.path.isfile(blend_file):
|
|
raise FileNotFoundError
|
|
|
|
return blend_file
|
|
|
|
@classmethod
|
|
def get_param_scene(cls, request):
|
|
if cls.PARAM_SCENE not in request.args or request.args[cls.PARAM_SCENE][0] == b'':
|
|
return None
|
|
|
|
return request.args[cls.PARAM_SCENE][0].decode("utf-8")
|
|
|
|
@classmethod
|
|
def get_param_start_frame(cls, request):
|
|
if cls.PARAM_START_FRAME not in request.args or request.args[cls.PARAM_START_FRAME][0] == b'':
|
|
return None
|
|
|
|
start_frame = request.args[cls.PARAM_START_FRAME][0]
|
|
if not start_frame.isdigit():
|
|
raise ValueError
|
|
|
|
return int(start_frame)
|
|
|
|
@classmethod
|
|
def get_param_end_frame(cls, request):
|
|
if cls.PARAM_END_FRAME not in request.args or request.args[cls.PARAM_END_FRAME][0] == b'':
|
|
return None
|
|
|
|
end_frame = request.args[cls.PARAM_END_FRAME][0]
|
|
if not end_frame.isdigit():
|
|
raise ValueError
|
|
|
|
return int(end_frame)
|
|
|
|
def prepare_job(self, request):
|
|
return Job(
|
|
self.get_param_blend_file(request),
|
|
scene=self.get_param_scene(request),
|
|
start_frame=self.get_param_start_frame(request),
|
|
end_frame=self.get_param_end_frame(request)
|
|
)
|
|
|
|
def render_POST(self, request):
|
|
try:
|
|
job = self.prepare_job(request)
|
|
logging.info('Request Handler: received a job request: {}'.format(job))
|
|
self.worker.enqueue_job(job)
|
|
return b'OK'
|
|
except ValueError:
|
|
return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
|
|
except FileNotFoundError:
|
|
return resource.NoResource("Blend file not found")
|
|
|
|
def render_GET(self, request):
|
|
return json.dumps({
|
|
"current_job": str(self.worker.get_current_job()),
|
|
"queue": [str(job) for job in self.worker.get_job_queue()]
|
|
}).encode()
|
|
|
|
|
|
class CancelResource(resource.Resource):
|
|
isLeaf = True
|
|
PARAM_TYPE = b'type'
|
|
PARAM_JOBS = b'jobs'
|
|
|
|
CANCEL_TYPE_QUEUED_JOBS = b'queued_jobs'
|
|
CANCEL_TYPE_QUEUE = b'queue'
|
|
CANCEL_TYPE_CURRENT_JOB = b'current_job'
|
|
CANCEL_TYPE_ALL = b'all'
|
|
|
|
def __init__(self, worker):
|
|
self.worker = worker
|
|
|
|
@classmethod
|
|
def get_param_type(cls, request):
|
|
if cls.PARAM_TYPE not in request.args:
|
|
raise ValueError
|
|
|
|
param_type = request.args[cls.PARAM_TYPE][0]
|
|
available_type = [cls.CANCEL_TYPE_QUEUED_JOBS, cls.CANCEL_TYPE_QUEUE,
|
|
cls.CANCEL_TYPE_CURRENT_JOB, cls.CANCEL_TYPE_ALL]
|
|
if param_type not in available_type:
|
|
raise ValueError
|
|
|
|
return param_type
|
|
|
|
@classmethod
|
|
def get_param_jobs(cls, request):
|
|
if cls.PARAM_JOBS not in request.args:
|
|
raise ValueError
|
|
|
|
jobs = request.args[cls.PARAM_JOBS]
|
|
|
|
return [job.decode("utf-8") for job in jobs]
|
|
|
|
def render_POST(self, request):
|
|
logging.info('Request Handler: received cancel request')
|
|
try:
|
|
type_param = self.get_param_type(request)
|
|
if type_param == self.CANCEL_TYPE_QUEUED_JOBS:
|
|
self.worker.remove_job_list_from_queue(self.get_param_jobs(request))
|
|
elif type_param == self.CANCEL_TYPE_QUEUE:
|
|
self.worker.clear_job_queue()
|
|
elif type_param == self.CANCEL_TYPE_CURRENT_JOB:
|
|
self.worker.cancel_current_job()
|
|
elif type_param == self.CANCEL_TYPE_ALL:
|
|
self.worker.clear_job_queue()
|
|
self.worker.cancel_current_job()
|
|
except ValueError:
|
|
return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
|
|
|
|
return b'OK'
|
|
|
|
|
|
def main():
|
|
host = ''
|
|
port = 8025
|
|
|
|
root = Index()
|
|
status_output = StatusResource()
|
|
worker = QueueConsumer(status_output)
|
|
root.putChild(b'status', status_output)
|
|
root.putChild(b'job', JobResource(worker))
|
|
root.putChild(b'cancel', CancelResource(worker))
|
|
site = server.Site(root)
|
|
reactor.listenTCP(port, site)
|
|
|
|
worker.start_worker()
|
|
reactor.addSystemEventTrigger("before", "shutdown", worker.stop_worker)
|
|
|
|
logging.info('Starting server {}:{}'.format(host, port))
|
|
reactor.run()
|
|
|
|
|
|
class QueueConsumer:
|
|
def __init__(self, status_output):
|
|
self._job_queue = EditableQueue()
|
|
self._stop_working = Event()
|
|
self._cancel_job = Event()
|
|
self._status_output = status_output
|
|
|
|
self._last_output_status = ''
|
|
self._current_job = None
|
|
|
|
def start_worker(self):
|
|
self._stop_working.clear()
|
|
worker_thread = Thread(target=self._consume_queue)
|
|
worker_thread.start()
|
|
|
|
def stop_worker(self):
|
|
self._stop_working.set()
|
|
|
|
def enqueue_job(self, job):
|
|
self._job_queue.put(job)
|
|
|
|
def get_job_queue(self):
|
|
return self._job_queue.as_list()
|
|
|
|
def get_current_job(self):
|
|
return self._current_job
|
|
|
|
def cancel_current_job(self):
|
|
self._cancel_job.set()
|
|
|
|
def remove_job_list_from_queue(self, job_list):
|
|
removed_jobs = self._job_queue.remove_sublist(job_list)
|
|
for job in removed_jobs:
|
|
self._status_output.publish((str(job), Job.STATUS_CANCELED,))
|
|
|
|
def clear_job_queue(self):
|
|
removed_jobs = self._job_queue.clear()
|
|
for job in removed_jobs:
|
|
self._status_output.publish((str(job), Job.STATUS_CANCELED,))
|
|
|
|
def _consume_queue(self):
|
|
while not self._stop_working.is_set():
|
|
try:
|
|
job = self._job_queue.get(timeout=1)
|
|
except Empty:
|
|
continue
|
|
self._current_job = job
|
|
self._execute_job()
|
|
self._current_job = None
|
|
|
|
def _execute_job(self):
|
|
logging.info('Worker: starting a new job: {}'.format(self._current_job))
|
|
start_time = time.time()
|
|
p = Popen(["/usr/bin/blender"] + self._current_job.get_blender_args(),
|
|
stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1)
|
|
while not self._cancel_job.is_set():
|
|
line = p.stdout.readline()
|
|
if line == '':
|
|
if p.poll() is not None:
|
|
logging.info('Worker: job finished: {}'.format(self._current_job))
|
|
self._status_output.publish((str(self._current_job), Job.STATUS_DONE, time.time() - start_time,))
|
|
break
|
|
else:
|
|
status = self._parse_stdout(line)
|
|
if status is not None:
|
|
logging.info('Worker: job status is: {}'.format(status))
|
|
self._status_output.publish((str(self._current_job), status,))
|
|
|
|
if self._cancel_job.is_set():
|
|
logging.info('Worker: canceling: {}'.format(self._current_job))
|
|
p.kill()
|
|
self._cancel_job.clear()
|
|
self._status_output.publish((str(self._current_job), Job.STATUS_CANCELED,))
|
|
|
|
def _parse_stdout(self, line):
|
|
logging.debug('Worker: getting output: {}'.format(line))
|
|
regex_blend_status = "Fra:\d+"
|
|
t = re.search(regex_blend_status, line)
|
|
if t is not None and t[0] != self._last_output_status:
|
|
self._last_output_status = t[0]
|
|
return t[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
class EditableQueue(Queue):
|
|
def remove_sublist(self, item_list):
|
|
with self.not_empty:
|
|
removed_items = []
|
|
for item in item_list:
|
|
try:
|
|
self.queue.remove(item)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
removed_items.append(item)
|
|
return removed_items
|
|
|
|
def clear(self):
|
|
with self.not_empty:
|
|
removed_items = list(self.queue)
|
|
self.queue.clear()
|
|
return removed_items
|
|
|
|
def as_list(self):
|
|
with self.not_empty:
|
|
return list(self.queue)
|
|
|
|
|
|
class Job:
|
|
STATUS_DONE = 'done'
|
|
STATUS_CANCELED = 'canceled'
|
|
|
|
def __init__(self, blend_file, **kwargs):
|
|
self.blend_file = blend_file
|
|
self.scene = kwargs["scene"] if "scene" in kwargs else None
|
|
self.start_frame = kwargs["start_frame"] if "start_frame" in kwargs else None
|
|
self.end_frame = kwargs["end_frame"] if "end_frame" in kwargs else None
|
|
|
|
def __str__(self):
|
|
return "{}>{} {}:{}".format(
|
|
self.blend_file,
|
|
self.scene if self.scene is not None else "default_scene",
|
|
self.start_frame if self.start_frame is not None else "start",
|
|
self.end_frame if self.end_frame is not None else "end"
|
|
)
|
|
|
|
def get_blender_args(self):
|
|
args = ['--background', self.blend_file]
|
|
|
|
if self.scene is not None:
|
|
args.append('--scene')
|
|
args.append(self.scene)
|
|
|
|
if self.start_frame is not None:
|
|
args.append('--frame-start')
|
|
args.append(str(self.start_frame))
|
|
|
|
if self.end_frame is not None:
|
|
args.append('--frame-end')
|
|
args.append(str(self.end_frame))
|
|
|
|
args.append('--render-anim')
|
|
|
|
return args
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, str):
|
|
return other == str(self)
|
|
|
|
return other == self.blend_file and other.scene == self.scene \
|
|
and other.start_frame == self.start_frame \
|
|
and other.end_frame == self.end_frame
|
|
|
|
|
|
def get_index_content():
|
|
return '''<html>
|
|
<head>
|
|
</head>
|
|
<body>
|
|
<input type="submit" value="track_status" onclick="return start_event();" />
|
|
<ul id="events"></ul>
|
|
<form action="/job" method="post" target="_blank" onsubmit="return submit_job()" id="form_job">
|
|
<input type="radio" name="blend_file" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend" checked>pitou<br/>
|
|
<input type="radio" name="blend_file" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/thom.blend">thom<br/>
|
|
scene<input type="text" name="scene" value="pitou"><br/>
|
|
start_frame<input type="text" name="start_frame" value="1"><br/>
|
|
end_frame<input type="text" name="end_frame" value="50"><br/>
|
|
<input type="submit" value="send_job"/>
|
|
</form>
|
|
<form action="/cancel" method="post" target="_blank" onsubmit="return submit_cancel()" id="form_cancel">
|
|
<input type="text" name="toto[tata]
|
|
<input type="radio" name="type" value="queued_jobs" id="queued_jobs">queued_jobs<br/>
|
|
<input type="radio" name="type" value="queue">queue<br/>
|
|
<input type="radio" name="type" value="current_job" checked>current_job<br/>
|
|
<input type="radio" name="type" value="all">all<br/>
|
|
<select multiple name="jobs" id="jobs"></select>
|
|
<input type="submit" value="cancel"/>
|
|
</form>
|
|
<script type="text/javascript">
|
|
var listening = false;
|
|
function submit_job() {
|
|
submit_ajax(
|
|
document.getElementById("form_job"),
|
|
function() {start_event();}
|
|
);
|
|
return false;
|
|
}
|
|
|
|
function submit_cancel() {
|
|
submit_ajax(
|
|
document.getElementById("form_cancel"),
|
|
function() {update_queue();}
|
|
);
|
|
return false;
|
|
}
|
|
|
|
function submit_ajax(form, callback) {
|
|
var request = new XMLHttpRequest();
|
|
request.onreadystatechange = function() {
|
|
if (request.readyState == XMLHttpRequest.DONE && request.status == 200) {
|
|
callback();
|
|
}
|
|
};
|
|
request.open("POST", form.action);
|
|
request.send(new FormData(form));
|
|
update_queue()
|
|
}
|
|
function start_event() {
|
|
if (!listening) {
|
|
var eventList = document.getElementById("events");
|
|
while (eventList.firstChild) {
|
|
eventList.removeChild(eventList.firstChild);
|
|
}
|
|
var evtSource = new EventSource("/status");
|
|
evtSource.onmessage = function(e) {
|
|
var newElement = document.createElement("li");
|
|
newElement.innerHTML = "message: " + e.data;
|
|
eventList.appendChild(newElement);
|
|
if (eventList.childNodes.length > 5) {
|
|
eventList.removeChild(eventList.firstChild);
|
|
}
|
|
}
|
|
listening = true;
|
|
}
|
|
}
|
|
|
|
function update_queue() {
|
|
var request = new XMLHttpRequest();
|
|
request.onreadystatechange = function() {
|
|
if (request.readyState == XMLHttpRequest.DONE && request.status == 200) {
|
|
var response = JSON.parse(request.responseText);
|
|
var select = document.getElementById("jobs");
|
|
while (select.firstChild) {
|
|
select.removeChild(select.firstChild);
|
|
}
|
|
response.queue.forEach(function(job) {
|
|
var newElement = document.createElement("option");
|
|
newElement.text = job;
|
|
newElement.value = job;
|
|
select.appendChild(newElement);
|
|
});
|
|
}
|
|
};
|
|
request.open('GET', "/job", true);
|
|
request.send(null);
|
|
}
|
|
document.addEventListener("DOMContentLoaded", function(event) {
|
|
document.getElementById("queued_jobs").addEventListener("click", update_queue, false);
|
|
}, false);
|
|
</script>
|
|
</body>
|
|
</html>'''.encode()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|