import os import re import time import json import logging from queue import Queue, Empty from threading import Thread, Event from subprocess import Popen, PIPE, STDOUT from twisted.internet import reactor from twisted.web import server, resource, http logging.basicConfig(level=logging.INFO) class Index(resource.Resource): isLeaf = False def getChild(self, path, request): if path == b'': return self return resource.Resource.getChild(self, path, request) def render_GET(self, request): return get_index_content() class StatusResource(resource.Resource): isLeaf = True def __init__(self): self.subscriptions = [] self.last_status = ('idle',) def response_callback(self, err, request): request.finish logging.info('Request Handler: connection either resulted in error or client closed') self.subscriptions.remove(request) def add_event_stream_headers(self, request): request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') request.setHeader("Access-Control-Allow-Origin", "*") return request def add_json_headers(self, request): request.setHeader('Content-Type', 'application/json; charset=utf-8') return request def publish(self, status): self.last_status = status for req in self.subscriptions: self.push_sse_mesage(req=req, msg=status) def push_sse_mesage(self, req, msg): event_line = "data: {}\r\n".format(json.dumps(msg)) + '\r\n' req.write(event_line.encode()) def render_GET(self, request): accept_header = request.requestHeaders.getRawHeaders(b'accept') if b'text/event-stream' in accept_header: request = self.add_event_stream_headers(request) # 2. Format Headers request.write("") request.notifyFinish().addBoth(self.response_callback, request) self.subscriptions.append(request) logging.info('Request Handler: new subscriber ({})'.format(len(self.subscriptions))) return server.NOT_DONE_YET else: return json.dumps(self.last_status).encode() class JobResource(resource.Resource): isLeaf = True PARAM_BLEND_FILE = b'blend_file' PARAM_SCENE = b'scene' PARAM_START_FRAME = b'start_frame' PARAM_END_FRAME = b'end_frame' def __init__(self, worker): self.worker = worker @classmethod def get_param_blend_file(cls, request): if cls.PARAM_BLEND_FILE not in request.args or not request.args[cls.PARAM_BLEND_FILE][0].endswith(b'.blend'): raise ValueError blend_file = request.args[cls.PARAM_BLEND_FILE][0].decode("utf-8") if not os.path.isfile(blend_file): raise FileNotFoundError return blend_file @classmethod def get_param_scene(cls, request): if cls.PARAM_SCENE not in request.args or request.args[cls.PARAM_SCENE][0] == b'': return None return request.args[cls.PARAM_SCENE][0].decode("utf-8") @classmethod def get_param_start_frame(cls, request): if cls.PARAM_START_FRAME not in request.args or request.args[cls.PARAM_START_FRAME][0] == b'': return None start_frame = request.args[cls.PARAM_START_FRAME][0] if not start_frame.isdigit(): raise ValueError return int(start_frame) @classmethod def get_param_end_frame(cls, request): if cls.PARAM_END_FRAME not in request.args or request.args[cls.PARAM_END_FRAME][0] == b'': return None end_frame = request.args[cls.PARAM_END_FRAME][0] if not end_frame.isdigit(): raise ValueError return int(end_frame) def prepare_job(self, request): return Job( self.get_param_blend_file(request), scene=self.get_param_scene(request), start_frame=self.get_param_start_frame(request), end_frame=self.get_param_end_frame(request) ) def render_POST(self, request): try: job = self.prepare_job(request) logging.info('Request Handler: received a job request: {}'.format(job)) self.worker.enqueue_job(job) return b'OK' except ValueError: return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format") except FileNotFoundError: return resource.NoResource("Blend file not found") def render_GET(self, request): return json.dumps({ "current_job": str(self.worker.get_current_job()), "queue": [str(job) for job in self.worker.get_job_queue()] }).encode() class CancelResource(resource.Resource): isLeaf = True PARAM_TYPE = b'type' PARAM_JOBS = b'jobs' CANCEL_TYPE_QUEUED_JOBS = b'queued_jobs' CANCEL_TYPE_QUEUE = b'queue' CANCEL_TYPE_CURRENT_JOB = b'current_job' CANCEL_TYPE_ALL = b'all' def __init__(self, worker): self.worker = worker @classmethod def get_param_type(cls, request): if cls.PARAM_TYPE not in request.args: raise ValueError param_type = request.args[cls.PARAM_TYPE][0] available_type = [cls.CANCEL_TYPE_QUEUED_JOBS, cls.CANCEL_TYPE_QUEUE, cls.CANCEL_TYPE_CURRENT_JOB, cls.CANCEL_TYPE_ALL] if param_type not in available_type: raise ValueError return param_type @classmethod def get_param_jobs(cls, request): if cls.PARAM_JOBS not in request.args: raise ValueError jobs = request.args[cls.PARAM_JOBS] return [job.decode("utf-8") for job in jobs] def render_POST(self, request): logging.info('Request Handler: received cancel request') try: type_param = self.get_param_type(request) if type_param == self.CANCEL_TYPE_QUEUED_JOBS: self.worker.remove_job_list_from_queue(self.get_param_jobs(request)) elif type_param == self.CANCEL_TYPE_QUEUE: self.worker.clear_job_queue() elif type_param == self.CANCEL_TYPE_CURRENT_JOB: self.worker.cancel_current_job() elif type_param == self.CANCEL_TYPE_ALL: self.worker.clear_job_queue() self.worker.cancel_current_job() except ValueError: return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format") return b'OK' def main(): host = '' port = 8025 root = Index() status_output = StatusResource() worker = QueueConsumer(status_output) root.putChild(b'status', status_output) root.putChild(b'job', JobResource(worker)) root.putChild(b'cancel', CancelResource(worker)) site = server.Site(root) reactor.listenTCP(port, site) worker.start_worker() reactor.addSystemEventTrigger("before", "shutdown", worker.stop_worker) logging.info('Starting server {}:{}'.format(host, port)) reactor.run() class QueueConsumer: def __init__(self, status_output): self._job_queue = EditableQueue() self._stop_working = Event() self._cancel_job = Event() self._status_output = status_output self._last_output_status = '' self._current_job = None def start_worker(self): self._stop_working.clear() worker_thread = Thread(target=self._consume_queue) worker_thread.start() def stop_worker(self): self._stop_working.set() def enqueue_job(self, job): self._job_queue.put(job) def get_job_queue(self): return self._job_queue.as_list() def get_current_job(self): return self._current_job def cancel_current_job(self): self._cancel_job.set() def remove_job_list_from_queue(self, job_list): removed_jobs = self._job_queue.remove_sublist(job_list) for job in removed_jobs: self._status_output.publish((str(job), Job.STATUS_CANCELED,)) def clear_job_queue(self): removed_jobs = self._job_queue.clear() for job in removed_jobs: self._status_output.publish((str(job), Job.STATUS_CANCELED,)) def _consume_queue(self): while not self._stop_working.is_set(): try: job = self._job_queue.get(timeout=1) except Empty: continue self._current_job = job self._execute_job() self._current_job = None def _execute_job(self): logging.info('Worker: starting a new job: {}'.format(self._current_job)) start_time = time.time() p = Popen(["/usr/bin/blender"] + self._current_job.get_blender_args(), stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1) while not self._cancel_job.is_set(): line = p.stdout.readline() if line == '': if p.poll() is not None: logging.info('Worker: job finished: {}'.format(self._current_job)) self._status_output.publish((str(self._current_job), Job.STATUS_DONE, time.time() - start_time,)) break else: status = self._parse_stdout(line) if status is not None: logging.info('Worker: job status is: {}'.format(status)) self._status_output.publish((str(self._current_job), status,)) if self._cancel_job.is_set(): logging.info('Worker: canceling: {}'.format(self._current_job)) p.kill() self._cancel_job.clear() self._status_output.publish((str(self._current_job), Job.STATUS_CANCELED,)) def _parse_stdout(self, line): logging.debug('Worker: getting output: {}'.format(line)) regex_blend_status = "Fra:\d+" t = re.search(regex_blend_status, line) if t is not None and t[0] != self._last_output_status: self._last_output_status = t[0] return t[0] else: return None class EditableQueue(Queue): def remove_sublist(self, item_list): with self.not_empty: removed_items = [] for item in item_list: try: self.queue.remove(item) except ValueError: pass else: removed_items.append(item) return removed_items def clear(self): with self.not_empty: removed_items = list(self.queue) self.queue.clear() return removed_items def as_list(self): with self.not_empty: return list(self.queue) class Job: STATUS_DONE = 'done' STATUS_CANCELED = 'canceled' def __init__(self, blend_file, **kwargs): self.blend_file = blend_file self.scene = kwargs["scene"] if "scene" in kwargs else None self.start_frame = kwargs["start_frame"] if "start_frame" in kwargs else None self.end_frame = kwargs["end_frame"] if "end_frame" in kwargs else None def __str__(self): return "{}>{} {}:{}".format( self.blend_file, self.scene if self.scene is not None else "default_scene", self.start_frame if self.start_frame is not None else "start", self.end_frame if self.end_frame is not None else "end" ) def get_blender_args(self): args = ['--background', self.blend_file] if self.scene is not None: args.append('--scene') args.append(self.scene) if self.start_frame is not None: args.append('--frame-start') args.append(str(self.start_frame)) if self.end_frame is not None: args.append('--frame-end') args.append(str(self.end_frame)) args.append('--render-anim') return args def __eq__(self, other): if isinstance(other, str): return other == str(self) return other == self.blend_file and other.scene == self.scene \ and other.start_frame == self.start_frame \ and other.end_frame == self.end_frame def get_index_content(): return '''