diff --git a/.idea/PushBlendPull.iml b/.idea/PushBlendPull.iml
new file mode 100644
index 0000000..aa8ec29
--- /dev/null
+++ b/.idea/PushBlendPull.iml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
new file mode 100644
index 0000000..a55e7a1
--- /dev/null
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/markdown-navigator.xml b/.idea/markdown-navigator.xml
new file mode 100644
index 0000000..4fdc309
--- /dev/null
+++ b/.idea/markdown-navigator.xml
@@ -0,0 +1,70 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/markdown-navigator/profiles_settings.xml b/.idea/markdown-navigator/profiles_settings.xml
new file mode 100644
index 0000000..57927c5
--- /dev/null
+++ b/.idea/markdown-navigator/profiles_settings.xml
@@ -0,0 +1,3 @@
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..96d08ec
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..cbf3540
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 0000000..3cd7e6b
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,940 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ queue
+ remo
+ htt
+ dec
+ canceled
+ http
+ class
+ cgi
+ render
+ Job
+ curre
+ rege
+ get_worker
+ don
+ time
+ worker
+ process_joblist
+ onli
+ loc
+ str(
+ proc
+ synch
+ debu
+ pol
+ done
+ Pop
+ RLock
+ time.
+ col
+ layou
+
+
+ action_queue
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1514719771907
+
+
+ 1514719771907
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ file:///usr/lib/python3.6/socketserver.py
+ 318
+
+
+
+ file://$PROJECT_DIR$/worker.py
+ 311
+
+
+
+ file://$PROJECT_DIR$/worker.py
+ 133
+
+
+
+ file://$PROJECT_DIR$/dispatcher.py
+ 241
+
+
+
+
+
+
+
+
+
+
+
+
+
+ event_data
+ Python
+ EXPRESSION
+
+
+ self._download_idle
+ Python
+ EXPRESSION
+
+
+ reactor.getThreadPool()
+ Python
+ EXPRESSION
+
+
+ reactor.getThreadPool().dumpStats()
+ Python
+ EXPRESSION
+
+
+ reactor.getThreadPoll()
+ Python
+ EXPRESSION
+
+
+ p.stdout.readline()
+ Python
+ EXPRESSION
+
+
+ self.get_http_address(self.path_status)
+ Python
+ EXPRESSION
+
+
+ '>'.join(job)
+ Python
+ EXPRESSION
+
+
+ job.join('>')
+ Python
+ EXPRESSION
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index 087853d..4b3351f 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,26 @@
# PushBlendPull
-Blender plugin that allows you to deport the actual blending of your project on another computer.
\ No newline at end of file
+Blender plugin that allows you to deport the actual blending of your project on another computer.
+
+## Requirements:
+1. Worker:
+ * twisted (requires python header)
+
+## Bugs:
+
+
+## Improvements:
+1. Worker
+ * Clean after cancel
+2. Dispatcher
+ * Accept workload parameter
+ * Cancel Job/Farm(redispactch)/All
+ * Only sync/ only blend
+ * Exchange only needed files
+ * P2P file exchange
+ * Slave mode
+3. Plugin
+ * Push/Pull to remote dispatcher
+ * Allow farm configuration/file exchange
+ * Track rendered/canceld scenes/blendfiles
+
diff --git a/client.py b/client.py
new file mode 100644
index 0000000..ffd9313
--- /dev/null
+++ b/client.py
@@ -0,0 +1,158 @@
+import bpy
+
+from bpy.types import Panel, Operator, PropertyGroup, UIList
+from bpy.props import (StringProperty,
+ BoolProperty,
+ IntProperty,
+ FloatProperty,
+ EnumProperty,
+ PointerProperty,
+ CollectionProperty
+ )
+
+
+class PBPBlendFileProperty(PropertyGroup):
+ path = StringProperty(subtype="FILE_PATH")
+ filename = StringProperty(subtype="FILE_NAME")
+ blending_enabled = BoolProperty(
+ name="Enable or Disable",
+ description="Enable blend file for blending",
+ default=False)
+ status = StringProperty(
+ name="Status",
+ description="Status of current file",
+ default=""
+ )
+
+
+class PBPWorkerProperty(PropertyGroup):
+ hostname = StringProperty()
+ port = StringProperty()
+ remote_project_path = StringProperty()
+ blending_enabled = BoolProperty(
+ name="Enable or Disable",
+ description="Enable farm for blending",
+ default=False)
+ status = StringProperty(
+ name="Status",
+ description="Status of current file",
+ default=""
+ )
+
+
+class PBPProperties(PropertyGroup):
+ project_folder = StringProperty(
+ name="Project folder",
+ description="Path to project folder",
+ default="//",
+ subtype="DIR_PATH"
+ )
+ worker_list = CollectionProperty(type=PBPWorkerProperty)
+ worker_list_index = IntProperty(name="Index for my_list", default=0)
+
+ blend_file_list = CollectionProperty(type=PBPBlendFileProperty)
+ upload_status = StringProperty(
+ name="Upload status",
+ description="Status of current upload",
+ default=""
+ )
+
+
+class RENDER_UL_workers(UIList):
+ def draw_item(self, context, layout, data, item, icon, active_data, active_propname, index):
+ layout.label(item.hostname)
+
+
+class PBPProjectPanel(Panel):
+ bl_idname = "RENDER_PT_PBP_project_panel"
+ bl_label = "PBP Workers Settings"
+ bl_space_type = 'PROPERTIES'
+ bl_region_type = 'WINDOW'
+ bl_context = "render"
+
+ def draw(self, context):
+ layout = self.layout
+
+ # layout.operator(ProjectManagerStartBlend.bl_idname)
+ configuration = context.window_manager.push_blend_pull
+ layout.label(text="Workers")
+
+ row = layout.row()
+ col = row.column()
+ col.template_list("RENDER_UL_workers", "", configuration, "worker_list", configuration, "worker_list_index", rows=2)
+
+ col = row.column()
+ sub = col.column(align=True)
+ sub.operator("render.pbp_add_worker", icon='ZOOMIN', text="")
+ sub.operator("render.pbp_del_worker", icon='ZOOMOUT', text="")
+
+
+class PBPWorkersPanel(Panel):
+ bl_idname = "RENDER_PT_PBP_workers_panel"
+ bl_label = "PBP Workers Settings"
+ bl_space_type = 'PROPERTIES'
+ bl_region_type = 'WINDOW'
+ bl_context = "render"
+
+ def draw(self, context):
+ layout = self.layout
+
+ # layout.operator(ProjectManagerStartBlend.bl_idname)
+ configuration = context.window_manager.push_blend_pull
+ layout.label(text="Workers")
+
+ row = layout.row()
+ col = row.column()
+ col.template_list("RENDER_UL_workers", "", configuration, "worker_list", configuration, "worker_list_index", rows=2)
+
+ col = row.column()
+ sub = col.column(align=True)
+ sub.operator("render.pbp_add_worker", icon='ZOOMIN', text="")
+ sub.operator("render.pbp_del_worker", icon='ZOOMOUT', text="")
+
+
+class PBPAddWorkerOperator(Operator):
+ bl_idname = "render.pbp_add_worker"
+ bl_label = "Add a worker"
+
+ def execute(self, context):
+
+ return {'FINISHED'}
+
+ def invoke(self, context, event):
+ wm = context.window_manager
+ return wm.invoke_props_dialog(self, width=600)
+
+ def draw(self, context):
+ configuration = context.window_manager.push_blend_pull
+ new_worker = configuration.worker_list.add()
+ layout = self.layout
+ row = layout.row(align=True)
+ row.prop(new_worker, "hostname")
+ row.prop(new_worker, "port")
+ row.prop(new_worker, "remote_project_path")
+ row.prop(new_worker, "blending_enabled")
+
+
+class PBPDelWorkerOperator(Operator):
+ bl_idname = "render.pbp_del_worker"
+ bl_label = "Remove a worker"
+
+ def execute(self, context):
+ return {'FINISHED'}
+
+
+def register():
+
+ bpy.utils.register_class(RENDER_UL_workers)
+ bpy.utils.register_module(__name__)
+ bpy.types.WindowManager.push_blend_pull = PointerProperty(type=PBPProperties)
+
+
+def unregister():
+ bpy.utils.unregister_module(__name__)
+ del bpy.types.WindowManager.push_blend_pull
+
+
+if __name__ == "__main__":
+ register()
diff --git a/dispatcher.py b/dispatcher.py
new file mode 100644
index 0000000..82ba00e
--- /dev/null
+++ b/dispatcher.py
@@ -0,0 +1,574 @@
+import os
+import re
+import math
+import json
+import logging
+
+from threading import Thread, Event, RLock
+from subprocess import Popen, PIPE, STDOUT
+
+import requests
+from requests.exceptions import ConnectionError
+
+from twisted.internet import reactor
+from twisted.web import server, resource, http
+
+import sseclient
+import blendfile as bf
+
+logging.basicConfig(level=logging.INFO)
+
+PROJECT_PATH = "/home/ewandor/projects/dev/blender/PushBlendPull"
+
+class Index(resource.Resource):
+ isLeaf = False
+
+ def getChild(self, path, request):
+ if path == b'':
+ return self
+ return resource.Resource.getChild(self, path, request)
+
+ def render_GET(self, request):
+ return get_index_content()
+
+
+class WorkpileResource(resource.Resource):
+ isLeaf = True
+ PARAM_WORKERS = b'workers'
+ PARAM_WORKPILE = b'workpile'
+ PARAM_PROJECT_PATH = b'project_path'
+
+ @classmethod
+ def get_param_workers(cls, request):
+ if cls.PARAM_WORKERS not in request.args:
+ raise ValueError
+
+ regex_worker = "^(?P[^:]+(:\d+)?[^:]*):(?P.+)$"
+ workers = []
+ for worker in request.args[cls.PARAM_WORKERS]:
+ match = re.search(regex_worker, worker.decode("utf-8"))
+ if match is None:
+ raise ValueError
+ workers.append((match.group('server'), match.group('remote_path'),))
+
+ return workers
+
+ @classmethod
+ def get_param_workpile(cls, request):
+ if cls.PARAM_WORKPILE not in request.args:
+ raise ValueError
+
+ workpile = []
+ for work in request.args[cls.PARAM_WORKPILE]:
+ result = work.decode("utf-8").split('>')
+ if len(result) != 2:
+ raise ValueError
+ workpile.append(tuple(result))
+
+ return workpile
+
+ @classmethod
+ def get_param_project_path(cls, request):
+ if cls.PARAM_PROJECT_PATH not in request.args:
+ raise ValueError
+
+ project_path = request.args[cls.PARAM_PROJECT_PATH][0].decode("utf-8")
+ if not os.path.isdir(project_path):
+ raise FileNotFoundError
+
+ return project_path
+
+ def render_POST(self, request):
+ try:
+ logging.info('Request Handler: Received a new work pile')
+ stats = JobDispatcher.process_workpile(
+ self.get_param_workers(request),
+ self.get_param_project_path(request),
+ self.get_param_workpile(request)
+ )
+ return b'OK'
+ except ValueError:
+ return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
+ except FileNotFoundError:
+ return resource.NoResource("Blend file not found")
+ pass
+
+
+class StatusResource(resource.Resource):
+ isLeaf = True
+
+ def render_GET(self, request):
+ return json.dumps(get_status()).encode()
+
+
+class Job:
+ STATUS_QUEUED = 'queued'
+ STATUS_DONE = 'done'
+ STATUS_CANCELED = 'canceled'
+
+ def __init__(self, blend_file, scene, start_frame, end_frame, local_project_path, remote_project_path):
+ self.blend_file = blend_file
+ self.scene = scene
+ self.start_frame = start_frame
+ self.end_frame = end_frame
+
+ self.local_project_path = local_project_path
+ self.remote_project_path = remote_project_path
+
+ self.current_frame = 0
+ self.status = self.STATUS_QUEUED
+
+ def __str__(self):
+ return "{}>{} {}:{}".format(
+ self.blend_file,
+ self.scene if self.scene is not None else "default_scene",
+ self.start_frame if self.start_frame is not None else "start",
+ self.end_frame if self.end_frame is not None else "end"
+ )
+
+ def get_http_payload(self):
+ result = {'blend_file': self.blend_file}
+ if self.scene is not None:
+ result['scene'] = self.scene
+ if self.start_frame is not None:
+ result['start_frame'] = self.start_frame
+ if self.end_frame is not None:
+ result['end_frame'] = self.end_frame
+
+ return result
+
+ def __eq__(self, other):
+ if isinstance(other, str):
+ return other == str(self)
+
+ return other == self.blend_file and other.scene == self.scene \
+ and other.start_frame == self.start_frame \
+ and other.end_frame == self.end_frame
+
+
+class Worker:
+ path_status = '/status'
+ path_job = '/job'
+ path_cancel = '/cancel'
+ worker_pool = []
+ finished_jobs = []
+
+ @classmethod
+ def get_worker(cls, authority):
+ try:
+ worker = cls.worker_pool[cls.worker_pool.index(authority)]
+ except ValueError:
+ worker = Worker(authority)
+
+ return worker
+
+ def __init__(self, authority):
+ self.authority = authority
+ self.job_queue = []
+ self.idle = True
+
+ self._upload_lock = RLock()
+ self._download_lock = RLock()
+ self.upload_status = None
+ self.download_status = None
+
+ self.worker_pool.append(self)
+
+ def get_http_address(self, path):
+ return 'http://' + self.authority + path
+
+ def check_online(self):
+ try:
+ requests.get(self.get_http_address(self.path_status))
+ except ConnectionError:
+ return False
+ return True
+
+ def process_joblist(self, project_path, remote_path, joblist):
+ reactor.callInThread(self._do_process_joblist, project_path, remote_path, joblist)
+ #Thread(target=self._do_process_joblist, args=(project_path, remote_path, joblist,)).start()
+
+ def _do_process_joblist(self, project_path, remote_path, joblist):
+ logging.info('Worker<{}>: Synchronizing worker'.format(self))
+ self.push_to(project_path, remote_path)
+ for job in joblist:
+ self.enqueue_job(job)
+
+ def push_to(self, project_path, remote_path):
+ self._upload_lock.acquire()
+ for upload_status in RemoteSynchronizer.push_to_worker(self, project_path, remote_path):
+ self.upload_status = upload_status
+
+ self.upload_status = None
+ self._upload_lock.release()
+
+ def pull_from(self, project_path, remote_path):
+ self._download_lock.acquire()
+ for download_status in RemoteSynchronizer.pull_from_worker(self, project_path, remote_path):
+ self.download_status = download_status
+
+ self.download_status = None
+ self._download_lock.release()
+
+ def enqueue_job(self, job):
+ self.job_queue.append(job)
+ if self.idle:
+ ready_event = Event()
+ Thread(target=self.start_listening, args=(ready_event,)).start()
+ ready_event.wait()
+ logging.info('Worker<{}>: Sending {}'.format(self, job))
+ requests.post(self.get_http_address(self.path_job), data=job.get_http_payload())
+
+ def start_listening(self, ready_event):
+ response = requests.get(
+ self.get_http_address(self.path_status),
+ stream=True,
+ headers={'accept': 'text/event-stream'}
+ )
+ status_output = sseclient.SSEClient(response)
+ stop = False
+ self.idle = False
+ ready_event.set()
+ for event in status_output.events():
+ event_data = json.loads(event.data)
+ if event_data[1] == Job.STATUS_CANCELED:
+ if event_data[0] in self.job_queue:
+ job = self.job_queue.pop(self.job_queue.index(event_data[0]))
+ job.status = Job.STATUS_CANCELED
+ self.finished_jobs.append(job)
+
+ elif event_data[0] == self.job_queue[0]:
+ if event_data[1] == Job.STATUS_DONE:
+ job = self.job_queue.pop(0)
+ job.status = Job.STATUS_DONE
+ self.finished_jobs.append(job)
+ logging.info('Worker<{}>: Job done {}, starting syncing local files'.format(self, job))
+ Thread(target=self.pull_from, args=(job.local_project_path, job.remote_project_path)).start()
+ if len(self.job_queue) == 0:
+ stop = True
+ elif event_data[1][:4] == 'Fra:':
+ self.job_queue[0].current_frame = int(event_data[1][4:])
+ if stop:
+ status_output.close()
+ self.idle = True
+ return
+
+ def __eq__(self, other):
+ if isinstance(other, str):
+ return other == self.authority
+
+ return other.authority == self.authority
+
+ def __str__(self):
+ return self.authority
+
+ def __hash__(self):
+ return self.authority.__hash__()
+
+
+class RemoteSynchronizer:
+ @classmethod
+ def push_to_worker(cls, worker, local_path, remote_path):
+ address = worker.authority.split(':')[0]
+
+ destination = "{}:{}".format(address, remote_path)
+ return cls._do_sync(local_path, destination, ['*.blend1', 'README.md'])
+
+ @classmethod
+ def pull_from_worker(cls, worker, local_path, remote_path):
+ address = worker.authority.split(':')[0]
+
+ source = "{}:{}".format(address, remote_path)
+ return cls._do_sync(source, local_path, ['*.blend'])
+
+ @classmethod
+ def _do_sync(cls, source, destination, exclude_list):
+ args = ['rsync', '-azP', '--info=progress2']
+ for exclude_patern in exclude_list:
+ args.append('--exclude')
+ args.append(exclude_patern)
+
+ args.append(source)
+ args.append(destination)
+
+ p = Popen(args, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1)
+ last_line = ''
+ while True:
+ line = p.stdout.readline()
+ if line == '':
+ if p.poll() is not None:
+ break
+ elif line != last_line:
+ logging.debug('Synch: stdout: {}'.format(line))
+ last_line = line
+
+ status = cls.handleLine(line)
+ if status is not None:
+ yield status
+
+
+ REGEX_PUSHPULL_STATUS = "\d+%"
+ @classmethod
+ def handleLine(cls, line, current_file=None):
+ if line == 'end':
+ return line
+ t = re.search(cls.REGEX_PUSHPULL_STATUS, line)
+ if t is not None:
+ return t[0]
+
+
+def get_status():
+ workers_status = {}
+ subjobs_status = {}
+ for worker in Worker.worker_pool:
+ for subjob in worker.finished_jobs:
+ jobkey = (subjob.blend_file, subjob.scene)
+ if jobkey not in subjobs_status:
+ subjobs_status[jobkey] = []
+
+ subjobs_status[jobkey].append(subjob)
+
+ job_queue_status = []
+ for subjob in worker.job_queue:
+ job_queue_status.append(get_subjob_stat(subjob))
+ jobkey = (subjob.blend_file, subjob.scene)
+ if jobkey not in subjobs_status:
+ subjobs_status[jobkey] = []
+
+ subjobs_status[jobkey].append(subjob)
+ workers_status[worker.authority] = {
+ 'upload_status': worker.upload_status,
+ 'download_status':worker.download_status,
+ 'job_queue': job_queue_status
+ }
+ jobs_status = {}
+ for job, subjobs in subjobs_status.items():
+ total_frame = 0
+ frame_done = 0
+ for subjob in subjobs:
+ total_frame += subjob.end_frame - subjob.start_frame + 1
+ frame_done += subjob.current_frame - subjob.start_frame + 1
+ jobs_status['>'.join(job)] = {'frame_done': frame_done, 'total_frame': total_frame}
+
+ return {
+ 'project_path': PROJECT_PATH,
+ 'workers': workers_status,
+ 'jobs': jobs_status
+ }
+
+
+def get_subjob_stat(subjob):
+ return {
+ 'name': str(subjob),
+ 'current_frame': subjob.current_frame,
+ 'total_frame': subjob.end_frame - subjob.start_frame + 1
+ }
+
+
+class JobDispatcher:
+
+ @staticmethod
+ def process_workpile(worker_list, project_path, workpile):
+ checked_worker_list = []
+ remote_paths = {}
+ for worker_param in worker_list:
+ worker = Worker.get_worker(worker_param[0])
+ if worker.check_online():
+ checked_worker_list.append((worker, worker_param[1],))
+
+ dispatched_jobs = {w: [] for w in checked_worker_list}
+ for blend_file, scene in workpile:
+ dispatch = JobDispatcher.dispatch_scene_to_workers(
+ checked_worker_list,
+ project_path, blend_file, scene)
+ for worker, job in dispatch.items():
+ dispatched_jobs[worker].append(job)
+
+ for (worker, joblist) in dispatched_jobs.items():
+ worker[0].process_joblist(project_path, worker[1], joblist)
+
+ return dispatched_jobs
+
+ @staticmethod
+ def dispatch_scene_to_workers(worker_list, project_path, blend_file, scene):
+ blend = bf.open_blend(blend_file)
+ scenes_info = retrieve_blenderfile_scenes_info(blend)
+ if scene not in scenes_info:
+ raise ValueError
+
+ start_frame, end_frame = scenes_info[scene]
+
+ undispatched_frames = end_frame - start_frame + 1
+ available_workers = len(worker_list)
+
+ next_start_frame = start_frame
+ round_batch = False
+ job_dispatch = {}
+ for worker, remote_project_path in worker_list:
+ if not round_batch and undispatched_frames % available_workers != 0:
+ batch = math.ceil(undispatched_frames / available_workers)
+ elif not round_batch:
+ batch = undispatched_frames // available_workers
+ round_batch = True
+
+ job = Job(
+ blend_file.replace(project_path, remote_project_path),
+ scene, next_start_frame, next_start_frame + batch - 1,
+ project_path, remote_project_path)
+
+ job_dispatch[(worker, remote_project_path,)] = job
+
+ next_start_frame += batch
+ undispatched_frames -= batch
+ available_workers -= 1
+
+ return job_dispatch
+
+
+def main():
+ host = ''
+ port = 8026
+
+ root = Index()
+
+ root.putChild(b'status', StatusResource())
+ root.putChild(b'workpile', WorkpileResource())
+ # root.putChild(b'cancel', CancelResource)
+ site = server.Site(root)
+ reactor.listenTCP(port, site)
+
+ logging.info('Starting server {}:{}'.format(host, port))
+ reactor.run()
+# worker_list = [('localhost:8025', '/home/ewandor/projects/dev/blender/PushBlendPull/',)]
+# project_path = '/home/ewandor/projects/dev/blender/PushBlendPull/'
+# dispatch_scene_to_workers(
+# worker_list,
+# project_path,
+# '/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend',
+# 'thom'
+# )
+
+
+def retrieve_blendfile_assets(blendfile):
+ assets_dna_type = ['Image', 'bSound', 'MovieClip']
+ assets = []
+ for block in blendfile.blocks:
+ if block.dna_type_name in assets_dna_type and block.get(b'name') != '':
+ assets.append(block.get(b'name'))
+
+ return assets
+
+
+def retrieve_blenderfile_scenes_info(blendfile):
+ scenes_info = {}
+ scenes = [block for block in blendfile.blocks if block.code == b'SC']
+ for scene in scenes:
+ scenes_info[scene.get((b'id', b'name'))[2:]] = (
+ scene.get((b'r', b'sfra')),
+ scene.get((b'r', b'efra')),
+ )
+
+ return scenes_info
+
+
+# blend1 = bf.open_blend('tests/test_blendfile.blend1')
+# blend = bf.open_blend('tests/pitou.blend')
+#
+# seqs = [block for block in blend.blocks if block.dna_type_name == 'Sequence']
+#
+# for block in blend.blocks:
+# print(block.code, '-', block.dna_type_name)
+# print('coucou')
+# for block in blend1.blocks:
+# print(block.code, '-', block.dna_type_name)
+# # assets = retrieve_blendfile_assets(blend)
+# scenes_info = retrieve_blenderfile_scenes_info(blend)
+#
+#
+# print(scenes_info[2][0])
+def get_index_content():
+ return '''
+
+
+
+
+
+
+
+
+'''.encode()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/test.blend b/test.blend
new file mode 100644
index 0000000..9ae56e8
Binary files /dev/null and b/test.blend differ
diff --git a/test.blend1 b/test.blend1
new file mode 100644
index 0000000..aa162c4
Binary files /dev/null and b/test.blend1 differ
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..0a0b094
--- /dev/null
+++ b/test.py
@@ -0,0 +1,262 @@
+import bpy
+import os
+import re
+import time
+
+from queue import Queue
+from threading import Thread, Event
+from subprocess import Popen, PIPE, STDOUT
+
+from bpy.types import Panel, Operator, PropertyGroup
+from bpy.props import (StringProperty,
+ BoolProperty,
+ IntProperty,
+ FloatProperty,
+ EnumProperty,
+ PointerProperty,
+ CollectionProperty
+ )
+
+
+class BlendFileProperty(PropertyGroup):
+ path = StringProperty(subtype="FILE_PATH")
+ filename = StringProperty(subtype="FILE_NAME")
+ blending_enabled = BoolProperty(
+ name="Enable or Disable",
+ description="Enable blend file for blending",
+ default=False)
+ status = StringProperty(
+ name="Status",
+ description="Status of current file",
+ default=""
+ )
+
+
+class PushBlendPullProperties(PropertyGroup):
+ project_folder = StringProperty(
+ name="Project folder",
+ description="Path to project folder",
+ default="//",
+ subtype="DIR_PATH"
+ )
+ remote_host = StringProperty(
+ name="Remote host",
+ description="Address or host of remote",
+ defaul =""
+ )
+ remote_folder = StringProperty(
+ name="Remote project folder",
+ description="Path to remote project folder",
+ default=""
+ )
+ blend_file_list = CollectionProperty(type=BlendFileProperty)
+ upload_status = StringProperty(
+ name="Upload status",
+ description="Status of current upload",
+ default=""
+ )
+
+
+class ProjectManagerRefreshBlenFileOperator(Operator):
+ bl_idname = "wm.pm_refresh_blend"
+ bl_label = "Refresh Blender Files"
+
+ def execute(self, context):
+ blendFileList = context.window_manager.project_manager.blend_file_list
+ enabled = []
+ for blendFile in blendFileList:
+ if blendFile.blending_enabled:
+ enabled.append(blendFile.path)
+
+ blendFileList.clear()
+ path = bpy.path.abspath(context.window_manager.project_manager.project_folder)
+ for root, dirs, files in os.walk(path):
+ for file in files:
+ if file.endswith(".blend"):
+ blendFile = blendFileList.add()
+ blendFile.filename = os.path.splitext(file)[0]
+ blendFile.path = os.path.join(root, file)
+ blendFile.name = bpy.path.relpath(blendFile.path)
+ blendFile.blending_enabled = blendFile.path in enabled
+
+ return {'FINISHED'}
+
+
+class ProjectManagerStartBlend(Operator):
+ bl_idname = "wm.project_manager"
+ bl_label = "Start blend"
+
+ _timer = None
+ _queue = None
+ _worker = None
+
+ def execute(self, context):
+ wm = context.window_manager
+
+ self._queue = Queue()
+ self._worker = RemoteThreadHandler(
+ self._queue,
+ wm.project_manager.blend_file_list,
+ bpy.path.abspath(wm.project_manager.project_folder),
+ wm.project_manager.remote_host,
+ wm.project_manager.remote_folder
+ )
+ self._worker.start()
+
+ self._timer = wm.event_timer_add(1, context.window)
+ wm.modal_handler_add(self)
+ return {'RUNNING_MODAL'}
+
+ def modal(self, context, event):
+ if event.type in {'ESC'}:
+ self.cancel(context)
+ print("canceled")
+ return {'CANCELLED'}
+
+ if event.type == 'TIMER':
+ while not self._queue.empty():
+ self.handle_message(self._queue.get(), context.window_manager.project_manager)
+ self._queue.task_done()
+ if not self._worker.is_alive():
+ self.cancel(context)
+ print("finished")
+ return {'FINISHED'}
+
+ return {'PASS_THROUGH'}
+
+ def cancel(self, context):
+ wm = context.window_manager
+ wm.event_timer_remove(self._timer)
+ self._worker.stop()
+
+ def handle_message(self, msg, pm):
+ if msg[0] == "push":
+ print(msg)
+ pm.upload_status = "push "+msg[1]
+ elif msg[0] == "pull":
+ msg[2].status = "pull "+msg[1]
+ elif msg[0] == "blend":
+ msg[2].status = "blend "+msg[1]
+
+
+class RemoteThreadHandler(Thread):
+ def __init__(self, q, blendFileList, localFolder, remoteHost, remoteFolder):
+ super(RemoteThreadHandler, self).__init__()
+
+ self._queue = q
+ self.blendFileList = blendFileList
+ self.remotePath = "{}:{}".format(remoteHost,remoteFolder)
+ self.localFolder = localFolder
+ self.remoteHost = remoteHost
+ self.remoteFolder = remoteFolder
+
+ self._stop_event = Event()
+
+ def stop(self):
+ self._stop_event.set()
+
+ def stopped(self):
+ return self._stop_event.is_set()
+
+ ARGS_PUSH = "rsync -azP --info=progress2 --exclude '*.blend1' --exclude 'README.md' {} {}"
+ ARGS_PULL = "rsync -azP --info=progress2 --exclude '*.blend' {} {}"
+ ARGS_BLEND = "ssh -t hawat 'blender -b {} -a'"
+
+ REGEX_PUSHPULL_STATUS = "\d+%"
+
+ def handleLinePush(self, line, current_file=None):
+ if line == 'end':
+ self._queue.put(("push", "end",))
+ return
+ t = re.search(self.REGEX_PUSHPULL_STATUS, line)
+ if t is not None:
+ self._queue.put(("push", t[0],))
+
+ def handleLinePull(self, line, current_file):
+ if line == 'end':
+ self._queue.put(("pull", "end", current_file,))
+ return
+ t = re.search(self.REGEX_PUSHPULL_STATUS, line)
+ if t is not None:
+ self._queue.put(("pull", t[0], current_file,))
+
+ def run(self):
+ self.start_process_handle_queue(self.ARGS_PUSH.format(self.localFolder, self.remotePath), self.handleLinePush)
+
+ for blendFile in self.blendFileList:
+ if blendFile.blending_enabled:
+ remoteBlendFilePath = blendFile.path.replace(self.localFolder, self.remoteFolder)
+ self.start_process_handle_queue(self.ARGS_BLEND.format(remoteBlendFilePath), self.handleLineBlend, blendFile)
+
+ pullWorker = Thread(
+ target=self.start_process_handle_queue,
+ args=(self.ARGS_PULL.format(self.remotePath, self.localFolder) , self.handleLinePull, blendFile)
+ )
+ pullWorker.start()
+
+ def start_process_handle_queue(self, args, lineHandler, current_file = None):
+ if self.stopped():
+ return
+ print(args)
+ p = Popen(args, shell=True, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1)
+ last_line = ''
+ while not self.stopped():
+ line = p.stdout.readline()
+ if line == '':
+ if p.poll() is not None:
+ break
+ elif line != last_line:
+ lineHandler(line, current_file)
+ last_line = line
+
+ if self.stopped():
+ p.kill()
+
+ lineHandler('end', current_file)
+
+
+class ProjectManagerPanel(Panel):
+ bl_idname = "OBJECT_PT_project_manager"
+ bl_label = "Project Management"
+ bl_space_type = 'PROPERTIES'
+ bl_region_type = 'WINDOW'
+ bl_context = ""
+
+ def draw(self, context):
+ layout = self.layout
+
+ layout.operator(ProjectManagerStartBlend.bl_idname)
+
+ configuration = context.window_manager.project_manager
+ layout.label(text="Configuration")
+ layout = self.layout
+
+ row = layout.row(align=True)
+ row.prop(configuration, "project_folder")
+ row = layout.row(align=True)
+ row.prop(configuration, "remote_host")
+ row = layout.row(align=True)
+ row.prop(configuration, "remote_folder")
+ row = layout.row(align=True)
+ row.prop(configuration, "upload_status")
+
+ layout.operator(ProjectManagerRefreshBlenFileOperator.bl_idname)
+ fileList = configuration.blend_file_list
+ for blendFile in sorted(fileList.values(), key=lambda file: file.path):
+ row = layout.row()
+ row.prop(blendFile, "blending_enabled", text=bpy.path.relpath(blendFile.path))
+ row.prop(blendFile, "status", text="")
+
+
+def register():
+ bpy.utils.register_module(__name__)
+ bpy.types.WindowManager.project_manager = PointerProperty(type=ProjectManagerProperties)
+
+
+def unregister():
+ bpy.utils.unregister_module(__name__)
+ del bpy.types.WindowManager.project_manage
+
+
+if __name__ == "__main__":
+ register()
diff --git a/tests/pitou.blend b/tests/pitou.blend
new file mode 100644
index 0000000..117d0ac
Binary files /dev/null and b/tests/pitou.blend differ
diff --git a/tests/pitou.blend1 b/tests/pitou.blend1
new file mode 100644
index 0000000..54608c2
Binary files /dev/null and b/tests/pitou.blend1 differ
diff --git a/tests/test.sh b/tests/test.sh
new file mode 100755
index 0000000..9795fe7
--- /dev/null
+++ b/tests/test.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+echo $1 "start"
+sleep 1
+echo $1 "1/4"
+sleep 1
+echo $1 "2/4"
+sleep 1
+echo $1 "3/4"
+sleep 1
+echo $1 "4/4"
+sleep 1
+echo $1 "fin"
diff --git a/tests/test_blendfile.blend b/tests/test_blendfile.blend
new file mode 100644
index 0000000..b10b8ef
Binary files /dev/null and b/tests/test_blendfile.blend differ
diff --git a/tests/test_blendfile.blend1 b/tests/test_blendfile.blend1
new file mode 100644
index 0000000..1ac3c8e
Binary files /dev/null and b/tests/test_blendfile.blend1 differ
diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py
new file mode 100644
index 0000000..7b627cb
--- /dev/null
+++ b/tests/test_dispatcher.py
@@ -0,0 +1,135 @@
+import os
+import re
+import json
+import logging
+
+from queue import Queue, Empty
+from threading import Thread, Event
+from subprocess import Popen, PIPE, STDOUT
+import requests
+
+from twisted.internet import reactor
+from twisted.web import server, resource, http
+
+logging.basicConfig(level=logging.INFO)
+
+class Index(resource.Resource):
+ isLeaf = False
+
+ def getChild(self, path, request):
+ if path == b'':
+ return self
+ return resource.Resource.getChild(self, path, request)
+
+ def render_GET(self, request):
+ return ''
+
+
+class StatusResource(resource.Resource):
+ isLeaf = True
+
+ def __init__(self):
+ self.subscriptions = []
+ self.last_status = ('idle',)
+
+ def response_callback(self, err, request):
+ request.finish
+ logging.info('Request Handler: connection either resulted in error or client closed')
+ self.subscriptions.remove(request)
+
+ def add_event_stream_headers(self, request):
+ request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
+ request.setHeader("Access-Control-Allow-Origin", "*")
+ return request
+
+ def add_json_headers(self, request):
+ request.setHeader('Content-Type', 'application/json; charset=utf-8')
+ return request
+
+ def publish(self, status):
+ self.last_status = status
+ for req in self.subscriptions:
+ self.push_sse_mesage(req=req, msg=status)
+
+ def push_sse_mesage(self, req, msg):
+ event_line = "data: {}\r\n".format(json.dumps(msg)) + '\r\n'
+ req.write(event_line.encode())
+
+ def render_GET(self, request):
+ accept_header = request.requestHeaders.getRawHeaders(b'accept')
+ if b'text/event-stream' in accept_header:
+ request = self.add_event_stream_headers(request) # 2. Format Headers
+ request.write("")
+ request.notifyFinish().addBoth(self.response_callback, request)
+ self.subscriptions.append(request)
+ logging.info('Request Handler: new subscriber ({})'.format(len(self.subscriptions)))
+
+ return server.NOT_DONE_YET
+ else:
+ return json.dumps(self.last_status).encode()
+
+
+class JobResource(resource.Resource):
+ isLeaf = True
+
+ def __init__(self, fake_worker):
+ self.fake_worker = fake_worker
+
+ def render_POST(self, request):
+ self.fake_worker.start_fake_working()
+ return b'OK'
+
+ def render_GET(self, request):
+ return json.dumps({
+ "current_job": '',
+ "queue": []
+ }).encode()
+
+
+def main():
+ host = ''
+ port = 8025
+
+ for x in range(1, 255):
+ interface = '127.0.0.{}'.format(x)
+ root = Index()
+ status_output = StatusResource()
+ worker = FakeWorker(status_output)
+ root.putChild(b'status', status_output)
+ root.putChild(b'job', JobResource(worker))
+ site = server.Site(root)
+ reactor.listenTCP(port, site, interface=interface)
+
+ reactor.addSystemEventTrigger("after", "startup", worker.stop_worker)
+ logging.info('Starting server {}:{}'.format(host, port))
+ reactor.run()
+
+def send_request_to_dispatcher():
+ request_param = {
+ 'workers': '',
+ 'workpile': '',
+ 'project_path': ''
+ }
+ requests.post('http://localhost:8026/workpile', data=request_param)
+
+class FakeWorker:
+ def __init__(self, status_output):
+ self._stop_working = Event()
+ self._cancel_job = Event()
+ self._status_output = status_output
+
+ self._last_output_status = ''
+ self._current_job = None
+
+ def start_fake_working(self):
+ worker_thread = Thread(target=self._do_fake_work)
+ worker_thread.start()
+
+ def _do_fake_work(self):
+ for x in range(1,1000):
+ self._status_output.publish(('fakeblendfile>fakescene', "Fra:{}".format(x),))
+ self._status_output.publish(('fakeblendfile>fakescene', "done",))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/thom.blend b/tests/thom.blend
new file mode 100644
index 0000000..daf9d56
Binary files /dev/null and b/tests/thom.blend differ
diff --git a/tests/thom.blend1 b/tests/thom.blend1
new file mode 100644
index 0000000..15de66d
Binary files /dev/null and b/tests/thom.blend1 differ
diff --git a/worker.py b/worker.py
new file mode 100644
index 0000000..8080c9f
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,480 @@
+
+import os
+import re
+import time
+import json
+import logging
+
+from queue import Queue, Empty
+from threading import Thread, Event
+from subprocess import Popen, PIPE, STDOUT
+
+from twisted.internet import reactor
+from twisted.web import server, resource, http
+
+logging.basicConfig(level=logging.INFO)
+
+
+class Index(resource.Resource):
+ isLeaf = False
+
+ def getChild(self, path, request):
+ if path == b'':
+ return self
+ return resource.Resource.getChild(self, path, request)
+
+ def render_GET(self, request):
+ return get_index_content()
+
+
+class StatusResource(resource.Resource):
+ isLeaf = True
+
+ def __init__(self):
+ self.subscriptions = []
+ self.last_status = ('idle',)
+
+ def response_callback(self, err, request):
+ request.finish
+ logging.info('Request Handler: connection either resulted in error or client closed')
+ self.subscriptions.remove(request)
+
+ def add_event_stream_headers(self, request):
+ request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
+ request.setHeader("Access-Control-Allow-Origin", "*")
+ return request
+
+ def add_json_headers(self, request):
+ request.setHeader('Content-Type', 'application/json; charset=utf-8')
+ return request
+
+ def publish(self, status):
+ self.last_status = status
+ for req in self.subscriptions:
+ self.push_sse_mesage(req=req, msg=status)
+
+ def push_sse_mesage(self, req, msg):
+ event_line = "data: {}\r\n".format(json.dumps(msg)) + '\r\n'
+ req.write(event_line.encode())
+
+ def render_GET(self, request):
+ accept_header = request.requestHeaders.getRawHeaders(b'accept')
+ if b'text/event-stream' in accept_header:
+ request = self.add_event_stream_headers(request) # 2. Format Headers
+ request.write("")
+ request.notifyFinish().addBoth(self.response_callback, request)
+ self.subscriptions.append(request)
+ logging.info('Request Handler: new subscriber ({})'.format(len(self.subscriptions)))
+
+ return server.NOT_DONE_YET
+ else:
+ return json.dumps(self.last_status).encode()
+
+
+class JobResource(resource.Resource):
+ isLeaf = True
+ PARAM_BLEND_FILE = b'blend_file'
+ PARAM_SCENE = b'scene'
+ PARAM_START_FRAME = b'start_frame'
+ PARAM_END_FRAME = b'end_frame'
+
+ def __init__(self, worker):
+ self.worker = worker
+
+ @classmethod
+ def get_param_blend_file(cls, request):
+ if cls.PARAM_BLEND_FILE not in request.args or not request.args[cls.PARAM_BLEND_FILE][0].endswith(b'.blend'):
+ raise ValueError
+
+ blend_file = request.args[cls.PARAM_BLEND_FILE][0].decode("utf-8")
+ if not os.path.isfile(blend_file):
+ raise FileNotFoundError
+
+ return blend_file
+
+ @classmethod
+ def get_param_scene(cls, request):
+ if cls.PARAM_SCENE not in request.args or request.args[cls.PARAM_SCENE][0] == b'':
+ return None
+
+ return request.args[cls.PARAM_SCENE][0].decode("utf-8")
+
+ @classmethod
+ def get_param_start_frame(cls, request):
+ if cls.PARAM_START_FRAME not in request.args or request.args[cls.PARAM_START_FRAME][0] == b'':
+ return None
+
+ start_frame = request.args[cls.PARAM_START_FRAME][0]
+ if not start_frame.isdigit():
+ raise ValueError
+
+ return int(start_frame)
+
+ @classmethod
+ def get_param_end_frame(cls, request):
+ if cls.PARAM_END_FRAME not in request.args or request.args[cls.PARAM_END_FRAME][0] == b'':
+ return None
+
+ end_frame = request.args[cls.PARAM_END_FRAME][0]
+ if not end_frame.isdigit():
+ raise ValueError
+
+ return int(end_frame)
+
+ def prepare_job(self, request):
+ return Job(
+ self.get_param_blend_file(request),
+ scene=self.get_param_scene(request),
+ start_frame=self.get_param_start_frame(request),
+ end_frame=self.get_param_end_frame(request)
+ )
+
+ def render_POST(self, request):
+ try:
+ job = self.prepare_job(request)
+ logging.info('Request Handler: received a job request: {}'.format(job))
+ self.worker.enqueue_job(job)
+ return b'OK'
+ except ValueError:
+ return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
+ except FileNotFoundError:
+ return resource.NoResource("Blend file not found")
+
+ def render_GET(self, request):
+ return json.dumps({
+ "current_job": str(self.worker.get_current_job()),
+ "queue": [str(job) for job in self.worker.get_job_queue()]
+ }).encode()
+
+
+class CancelResource(resource.Resource):
+ isLeaf = True
+ PARAM_TYPE = b'type'
+ PARAM_JOBS = b'jobs'
+
+ CANCEL_TYPE_QUEUED_JOBS = b'queued_jobs'
+ CANCEL_TYPE_QUEUE = b'queue'
+ CANCEL_TYPE_CURRENT_JOB = b'current_job'
+ CANCEL_TYPE_ALL = b'all'
+
+ def __init__(self, worker):
+ self.worker = worker
+
+ @classmethod
+ def get_param_type(cls, request):
+ if cls.PARAM_TYPE not in request.args:
+ raise ValueError
+
+ param_type = request.args[cls.PARAM_TYPE][0]
+ available_type = [cls.CANCEL_TYPE_QUEUED_JOBS, cls.CANCEL_TYPE_QUEUE,
+ cls.CANCEL_TYPE_CURRENT_JOB, cls.CANCEL_TYPE_ALL]
+ if param_type not in available_type:
+ raise ValueError
+
+ return param_type
+
+ @classmethod
+ def get_param_jobs(cls, request):
+ if cls.PARAM_JOBS not in request.args:
+ raise ValueError
+
+ jobs = request.args[cls.PARAM_JOBS]
+
+ return [job.decode("utf-8") for job in jobs]
+
+ def render_POST(self, request):
+ logging.info('Request Handler: received cancel request')
+ try:
+ type_param = self.get_param_type(request)
+ if type_param == self.CANCEL_TYPE_QUEUED_JOBS:
+ self.worker.remove_job_list_from_queue(self.get_param_jobs(request))
+ elif type_param == self.CANCEL_TYPE_QUEUE:
+ self.worker.clear_job_queue()
+ elif type_param == self.CANCEL_TYPE_CURRENT_JOB:
+ self.worker.cancel_current_job()
+ elif type_param == self.CANCEL_TYPE_ALL:
+ self.worker.clear_job_queue()
+ self.worker.cancel_current_job()
+ except ValueError:
+ return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
+
+ return b'OK'
+
+
+def main():
+ host = ''
+ port = 8025
+
+ root = Index()
+ status_output = StatusResource()
+ worker = QueueConsumer(status_output)
+ root.putChild(b'status', status_output)
+ root.putChild(b'job', JobResource(worker))
+ root.putChild(b'cancel', CancelResource(worker))
+ site = server.Site(root)
+ reactor.listenTCP(port, site)
+
+ worker.start_worker()
+ reactor.addSystemEventTrigger("before", "shutdown", worker.stop_worker)
+
+ logging.info('Starting server {}:{}'.format(host, port))
+ reactor.run()
+
+
+class QueueConsumer:
+ def __init__(self, status_output):
+ self._job_queue = EditableQueue()
+ self._stop_working = Event()
+ self._cancel_job = Event()
+ self._status_output = status_output
+
+ self._last_output_status = ''
+ self._current_job = None
+
+ def start_worker(self):
+ self._stop_working.clear()
+ worker_thread = Thread(target=self._consume_queue)
+ worker_thread.start()
+
+ def stop_worker(self):
+ self._stop_working.set()
+
+ def enqueue_job(self, job):
+ self._job_queue.put(job)
+
+ def get_job_queue(self):
+ return self._job_queue.as_list()
+
+ def get_current_job(self):
+ return self._current_job
+
+ def cancel_current_job(self):
+ self._cancel_job.set()
+
+ def remove_job_list_from_queue(self, job_list):
+ removed_jobs = self._job_queue.remove_sublist(job_list)
+ for job in removed_jobs:
+ self._status_output.publish((str(job), Job.STATUS_CANCELED,))
+
+ def clear_job_queue(self):
+ removed_jobs = self._job_queue.clear()
+ for job in removed_jobs:
+ self._status_output.publish((str(job), Job.STATUS_CANCELED,))
+
+ def _consume_queue(self):
+ while not self._stop_working.is_set():
+ try:
+ job = self._job_queue.get(timeout=1)
+ except Empty:
+ continue
+ self._current_job = job
+ self._execute_job()
+ self._current_job = None
+
+ def _execute_job(self):
+ logging.info('Worker: starting a new job: {}'.format(self._current_job))
+ start_time = time.time()
+ p = Popen(["/usr/bin/blender"] + self._current_job.get_blender_args(),
+ stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1)
+ while not self._cancel_job.is_set():
+ line = p.stdout.readline()
+ if line == '':
+ if p.poll() is not None:
+ logging.info('Worker: job finished: {}'.format(self._current_job))
+ self._status_output.publish((str(self._current_job), Job.STATUS_DONE, time.time() - start_time,))
+ break
+ else:
+ status = self._parse_stdout(line)
+ if status is not None:
+ logging.info('Worker: job status is: {}'.format(status))
+ self._status_output.publish((str(self._current_job), status,))
+
+ if self._cancel_job.is_set():
+ logging.info('Worker: canceling: {}'.format(self._current_job))
+ p.kill()
+ self._cancel_job.clear()
+ self._status_output.publish((str(self._current_job), Job.STATUS_CANCELED,))
+
+ def _parse_stdout(self, line):
+ logging.debug('Worker: getting output: {}'.format(line))
+ regex_blend_status = "Fra:\d+"
+ t = re.search(regex_blend_status, line)
+ if t is not None and t[0] != self._last_output_status:
+ self._last_output_status = t[0]
+ return t[0]
+ else:
+ return None
+
+
+class EditableQueue(Queue):
+ def remove_sublist(self, item_list):
+ with self.not_empty:
+ removed_items = []
+ for item in item_list:
+ try:
+ self.queue.remove(item)
+ except ValueError:
+ pass
+ else:
+ removed_items.append(item)
+ return removed_items
+
+ def clear(self):
+ with self.not_empty:
+ removed_items = list(self.queue)
+ self.queue.clear()
+ return removed_items
+
+ def as_list(self):
+ with self.not_empty:
+ return list(self.queue)
+
+
+class Job:
+ STATUS_DONE = 'done'
+ STATUS_CANCELED = 'canceled'
+
+ def __init__(self, blend_file, **kwargs):
+ self.blend_file = blend_file
+ self.scene = kwargs["scene"] if "scene" in kwargs else None
+ self.start_frame = kwargs["start_frame"] if "start_frame" in kwargs else None
+ self.end_frame = kwargs["end_frame"] if "end_frame" in kwargs else None
+
+ def __str__(self):
+ return "{}>{} {}:{}".format(
+ self.blend_file,
+ self.scene if self.scene is not None else "default_scene",
+ self.start_frame if self.start_frame is not None else "start",
+ self.end_frame if self.end_frame is not None else "end"
+ )
+
+ def get_blender_args(self):
+ args = ['--background', self.blend_file]
+
+ if self.scene is not None:
+ args.append('--scene')
+ args.append(self.scene)
+
+ if self.start_frame is not None:
+ args.append('--frame-start')
+ args.append(str(self.start_frame))
+
+ if self.end_frame is not None:
+ args.append('--frame-end')
+ args.append(str(self.end_frame))
+
+ args.append('--render-anim')
+
+ return args
+
+ def __eq__(self, other):
+ if isinstance(other, str):
+ return other == str(self)
+
+ return other == self.blend_file and other.scene == self.scene \
+ and other.start_frame == self.start_frame \
+ and other.end_frame == self.end_frame
+
+
+def get_index_content():
+ return '''
+
+
+
+
+
+
+
+
+
+'''.encode()
+
+
+if __name__ == "__main__":
+ main()