import os import re import json import logging from queue import Queue, Empty from threading import Thread, Event from subprocess import Popen, PIPE, STDOUT import requests from twisted.internet import reactor from twisted.web import server, resource, http logging.basicConfig(level=logging.INFO) class Index(resource.Resource): isLeaf = False def getChild(self, path, request): if path == b'': return self return resource.Resource.getChild(self, path, request) def render_GET(self, request): return '' class StatusResource(resource.Resource): isLeaf = True def __init__(self): self.subscriptions = [] self.last_status = ('idle',) def response_callback(self, err, request): request.finish logging.info('Request Handler: connection either resulted in error or client closed') self.subscriptions.remove(request) def add_event_stream_headers(self, request): request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') request.setHeader("Access-Control-Allow-Origin", "*") return request def add_json_headers(self, request): request.setHeader('Content-Type', 'application/json; charset=utf-8') return request def publish(self, status): self.last_status = status for req in self.subscriptions: self.push_sse_mesage(req=req, msg=status) def push_sse_mesage(self, req, msg): event_line = "data: {}\r\n".format(json.dumps(msg)) + '\r\n' req.write(event_line.encode()) def render_GET(self, request): accept_header = request.requestHeaders.getRawHeaders(b'accept') if b'text/event-stream' in accept_header: request = self.add_event_stream_headers(request) # 2. Format Headers request.write("") request.notifyFinish().addBoth(self.response_callback, request) self.subscriptions.append(request) logging.info('Request Handler: new subscriber ({})'.format(len(self.subscriptions))) return server.NOT_DONE_YET else: return json.dumps(self.last_status).encode() class JobResource(resource.Resource): isLeaf = True def __init__(self, fake_worker): self.fake_worker = fake_worker def render_POST(self, request): self.fake_worker.start_fake_working() return b'OK' def render_GET(self, request): return json.dumps({ "current_job": '', "queue": [] }).encode() def main(): host = '' port = 8025 for x in range(1, 255): interface = '127.0.0.{}'.format(x) root = Index() status_output = StatusResource() worker = FakeWorker(status_output) root.putChild(b'status', status_output) root.putChild(b'job', JobResource(worker)) site = server.Site(root) reactor.listenTCP(port, site, interface=interface) reactor.addSystemEventTrigger("after", "startup", worker.stop_worker) logging.info('Starting server {}:{}'.format(host, port)) reactor.run() def send_request_to_dispatcher(): request_param = { 'workers': '', 'workpile': '', 'project_path': '' } requests.post('http://localhost:8026/workpile', data=request_param) class FakeWorker: def __init__(self, status_output): self._stop_working = Event() self._cancel_job = Event() self._status_output = status_output self._last_output_status = '' self._current_job = None def start_fake_working(self): worker_thread = Thread(target=self._do_fake_work) worker_thread.start() def _do_fake_work(self): for x in range(1,1000): self._status_output.publish(('fakeblendfile>fakescene', "Fra:{}".format(x),)) self._status_output.publish(('fakeblendfile>fakescene', "done",)) if __name__ == "__main__": main()