575 lines
19 KiB
Python
575 lines
19 KiB
Python
import os
|
|
import re
|
|
import math
|
|
import json
|
|
import logging
|
|
|
|
from threading import Thread, Event, RLock
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
|
|
import requests
|
|
from requests.exceptions import ConnectionError
|
|
|
|
from twisted.internet import reactor
|
|
from twisted.web import server, resource, http
|
|
|
|
import sseclient
|
|
import blendfile as bf
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
PROJECT_PATH = "/home/ewandor/projects/dev/blender/PushBlendPull"
|
|
|
|
class Index(resource.Resource):
|
|
isLeaf = False
|
|
|
|
def getChild(self, path, request):
|
|
if path == b'':
|
|
return self
|
|
return resource.Resource.getChild(self, path, request)
|
|
|
|
def render_GET(self, request):
|
|
return get_index_content()
|
|
|
|
|
|
class WorkpileResource(resource.Resource):
|
|
isLeaf = True
|
|
PARAM_WORKERS = b'workers'
|
|
PARAM_WORKPILE = b'workpile'
|
|
PARAM_PROJECT_PATH = b'project_path'
|
|
|
|
@classmethod
|
|
def get_param_workers(cls, request):
|
|
if cls.PARAM_WORKERS not in request.args:
|
|
raise ValueError
|
|
|
|
regex_worker = "^(?P<server>[^:]+(:\d+)?[^:]*):(?P<remote_path>.+)$"
|
|
workers = []
|
|
for worker in request.args[cls.PARAM_WORKERS]:
|
|
match = re.search(regex_worker, worker.decode("utf-8"))
|
|
if match is None:
|
|
raise ValueError
|
|
workers.append((match.group('server'), match.group('remote_path'),))
|
|
|
|
return workers
|
|
|
|
@classmethod
|
|
def get_param_workpile(cls, request):
|
|
if cls.PARAM_WORKPILE not in request.args:
|
|
raise ValueError
|
|
|
|
workpile = []
|
|
for work in request.args[cls.PARAM_WORKPILE]:
|
|
result = work.decode("utf-8").split('>')
|
|
if len(result) != 2:
|
|
raise ValueError
|
|
workpile.append(tuple(result))
|
|
|
|
return workpile
|
|
|
|
@classmethod
|
|
def get_param_project_path(cls, request):
|
|
if cls.PARAM_PROJECT_PATH not in request.args:
|
|
raise ValueError
|
|
|
|
project_path = request.args[cls.PARAM_PROJECT_PATH][0].decode("utf-8")
|
|
if not os.path.isdir(project_path):
|
|
raise FileNotFoundError
|
|
|
|
return project_path
|
|
|
|
def render_POST(self, request):
|
|
try:
|
|
logging.info('Request Handler: Received a new work pile')
|
|
stats = JobDispatcher.process_workpile(
|
|
self.get_param_workers(request),
|
|
self.get_param_project_path(request),
|
|
self.get_param_workpile(request)
|
|
)
|
|
return b'OK'
|
|
except ValueError:
|
|
return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format")
|
|
except FileNotFoundError:
|
|
return resource.NoResource("Blend file not found")
|
|
pass
|
|
|
|
|
|
class StatusResource(resource.Resource):
|
|
isLeaf = True
|
|
|
|
def render_GET(self, request):
|
|
return json.dumps(get_status()).encode()
|
|
|
|
|
|
class Job:
|
|
STATUS_QUEUED = 'queued'
|
|
STATUS_DONE = 'done'
|
|
STATUS_CANCELED = 'canceled'
|
|
|
|
def __init__(self, blend_file, scene, start_frame, end_frame, local_project_path, remote_project_path):
|
|
self.blend_file = blend_file
|
|
self.scene = scene
|
|
self.start_frame = start_frame
|
|
self.end_frame = end_frame
|
|
|
|
self.local_project_path = local_project_path
|
|
self.remote_project_path = remote_project_path
|
|
|
|
self.current_frame = 0
|
|
self.status = self.STATUS_QUEUED
|
|
|
|
def __str__(self):
|
|
return "{}>{} {}:{}".format(
|
|
self.blend_file,
|
|
self.scene if self.scene is not None else "default_scene",
|
|
self.start_frame if self.start_frame is not None else "start",
|
|
self.end_frame if self.end_frame is not None else "end"
|
|
)
|
|
|
|
def get_http_payload(self):
|
|
result = {'blend_file': self.blend_file}
|
|
if self.scene is not None:
|
|
result['scene'] = self.scene
|
|
if self.start_frame is not None:
|
|
result['start_frame'] = self.start_frame
|
|
if self.end_frame is not None:
|
|
result['end_frame'] = self.end_frame
|
|
|
|
return result
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, str):
|
|
return other == str(self)
|
|
|
|
return other == self.blend_file and other.scene == self.scene \
|
|
and other.start_frame == self.start_frame \
|
|
and other.end_frame == self.end_frame
|
|
|
|
|
|
class Worker:
|
|
path_status = '/status'
|
|
path_job = '/job'
|
|
path_cancel = '/cancel'
|
|
worker_pool = []
|
|
finished_jobs = []
|
|
|
|
@classmethod
|
|
def get_worker(cls, authority):
|
|
try:
|
|
worker = cls.worker_pool[cls.worker_pool.index(authority)]
|
|
except ValueError:
|
|
worker = Worker(authority)
|
|
|
|
return worker
|
|
|
|
def __init__(self, authority):
|
|
self.authority = authority
|
|
self.job_queue = []
|
|
self.idle = True
|
|
|
|
self._upload_lock = RLock()
|
|
self._download_lock = RLock()
|
|
self.upload_status = None
|
|
self.download_status = None
|
|
|
|
self.worker_pool.append(self)
|
|
|
|
def get_http_address(self, path):
|
|
return 'http://' + self.authority + path
|
|
|
|
def check_online(self):
|
|
try:
|
|
requests.get(self.get_http_address(self.path_status))
|
|
except ConnectionError:
|
|
return False
|
|
return True
|
|
|
|
def process_joblist(self, project_path, remote_path, joblist):
|
|
reactor.callInThread(self._do_process_joblist, project_path, remote_path, joblist)
|
|
#Thread(target=self._do_process_joblist, args=(project_path, remote_path, joblist,)).start()
|
|
|
|
def _do_process_joblist(self, project_path, remote_path, joblist):
|
|
logging.info('Worker<{}>: Synchronizing worker'.format(self))
|
|
self.push_to(project_path, remote_path)
|
|
for job in joblist:
|
|
self.enqueue_job(job)
|
|
|
|
def push_to(self, project_path, remote_path):
|
|
self._upload_lock.acquire()
|
|
for upload_status in RemoteSynchronizer.push_to_worker(self, project_path, remote_path):
|
|
self.upload_status = upload_status
|
|
|
|
self.upload_status = None
|
|
self._upload_lock.release()
|
|
|
|
def pull_from(self, project_path, remote_path):
|
|
self._download_lock.acquire()
|
|
for download_status in RemoteSynchronizer.pull_from_worker(self, project_path, remote_path):
|
|
self.download_status = download_status
|
|
|
|
self.download_status = None
|
|
self._download_lock.release()
|
|
|
|
def enqueue_job(self, job):
|
|
self.job_queue.append(job)
|
|
if self.idle:
|
|
ready_event = Event()
|
|
Thread(target=self.start_listening, args=(ready_event,)).start()
|
|
ready_event.wait()
|
|
logging.info('Worker<{}>: Sending {}'.format(self, job))
|
|
requests.post(self.get_http_address(self.path_job), data=job.get_http_payload())
|
|
|
|
def start_listening(self, ready_event):
|
|
response = requests.get(
|
|
self.get_http_address(self.path_status),
|
|
stream=True,
|
|
headers={'accept': 'text/event-stream'}
|
|
)
|
|
status_output = sseclient.SSEClient(response)
|
|
stop = False
|
|
self.idle = False
|
|
ready_event.set()
|
|
for event in status_output.events():
|
|
event_data = json.loads(event.data)
|
|
if event_data[1] == Job.STATUS_CANCELED:
|
|
if event_data[0] in self.job_queue:
|
|
job = self.job_queue.pop(self.job_queue.index(event_data[0]))
|
|
job.status = Job.STATUS_CANCELED
|
|
self.finished_jobs.append(job)
|
|
|
|
elif event_data[0] == self.job_queue[0]:
|
|
if event_data[1] == Job.STATUS_DONE:
|
|
job = self.job_queue.pop(0)
|
|
job.status = Job.STATUS_DONE
|
|
self.finished_jobs.append(job)
|
|
logging.info('Worker<{}>: Job done {}, starting syncing local files'.format(self, job))
|
|
Thread(target=self.pull_from, args=(job.local_project_path, job.remote_project_path)).start()
|
|
if len(self.job_queue) == 0:
|
|
stop = True
|
|
elif event_data[1][:4] == 'Fra:':
|
|
self.job_queue[0].current_frame = int(event_data[1][4:])
|
|
if stop:
|
|
status_output.close()
|
|
self.idle = True
|
|
return
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, str):
|
|
return other == self.authority
|
|
|
|
return other.authority == self.authority
|
|
|
|
def __str__(self):
|
|
return self.authority
|
|
|
|
def __hash__(self):
|
|
return self.authority.__hash__()
|
|
|
|
|
|
class RemoteSynchronizer:
|
|
@classmethod
|
|
def push_to_worker(cls, worker, local_path, remote_path):
|
|
address = worker.authority.split(':')[0]
|
|
|
|
destination = "{}:{}".format(address, remote_path)
|
|
return cls._do_sync(local_path, destination, ['*.blend1', 'README.md'])
|
|
|
|
@classmethod
|
|
def pull_from_worker(cls, worker, local_path, remote_path):
|
|
address = worker.authority.split(':')[0]
|
|
|
|
source = "{}:{}".format(address, remote_path)
|
|
return cls._do_sync(source, local_path, ['*.blend'])
|
|
|
|
@classmethod
|
|
def _do_sync(cls, source, destination, exclude_list):
|
|
args = ['rsync', '-azP', '--info=progress2']
|
|
for exclude_patern in exclude_list:
|
|
args.append('--exclude')
|
|
args.append(exclude_patern)
|
|
|
|
args.append(source)
|
|
args.append(destination)
|
|
|
|
p = Popen(args, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1)
|
|
last_line = ''
|
|
while True:
|
|
line = p.stdout.readline()
|
|
if line == '':
|
|
if p.poll() is not None:
|
|
break
|
|
elif line != last_line:
|
|
logging.debug('Synch: stdout: {}'.format(line))
|
|
last_line = line
|
|
|
|
status = cls.handleLine(line)
|
|
if status is not None:
|
|
yield status
|
|
|
|
|
|
REGEX_PUSHPULL_STATUS = "\d+%"
|
|
@classmethod
|
|
def handleLine(cls, line, current_file=None):
|
|
if line == 'end':
|
|
return line
|
|
t = re.search(cls.REGEX_PUSHPULL_STATUS, line)
|
|
if t is not None:
|
|
return t[0]
|
|
|
|
|
|
def get_status():
|
|
workers_status = {}
|
|
subjobs_status = {}
|
|
for worker in Worker.worker_pool:
|
|
for subjob in worker.finished_jobs:
|
|
jobkey = (subjob.blend_file, subjob.scene)
|
|
if jobkey not in subjobs_status:
|
|
subjobs_status[jobkey] = []
|
|
|
|
subjobs_status[jobkey].append(subjob)
|
|
|
|
job_queue_status = []
|
|
for subjob in worker.job_queue:
|
|
job_queue_status.append(get_subjob_stat(subjob))
|
|
jobkey = (subjob.blend_file, subjob.scene)
|
|
if jobkey not in subjobs_status:
|
|
subjobs_status[jobkey] = []
|
|
|
|
subjobs_status[jobkey].append(subjob)
|
|
workers_status[worker.authority] = {
|
|
'upload_status': worker.upload_status,
|
|
'download_status':worker.download_status,
|
|
'job_queue': job_queue_status
|
|
}
|
|
jobs_status = {}
|
|
for job, subjobs in subjobs_status.items():
|
|
total_frame = 0
|
|
frame_done = 0
|
|
for subjob in subjobs:
|
|
total_frame += subjob.end_frame - subjob.start_frame + 1
|
|
frame_done += subjob.current_frame - subjob.start_frame + 1
|
|
jobs_status['>'.join(job)] = {'frame_done': frame_done, 'total_frame': total_frame}
|
|
|
|
return {
|
|
'project_path': PROJECT_PATH,
|
|
'workers': workers_status,
|
|
'jobs': jobs_status
|
|
}
|
|
|
|
|
|
def get_subjob_stat(subjob):
|
|
return {
|
|
'name': str(subjob),
|
|
'current_frame': subjob.current_frame,
|
|
'total_frame': subjob.end_frame - subjob.start_frame + 1
|
|
}
|
|
|
|
|
|
class JobDispatcher:
|
|
|
|
@staticmethod
|
|
def process_workpile(worker_list, project_path, workpile):
|
|
checked_worker_list = []
|
|
remote_paths = {}
|
|
for worker_param in worker_list:
|
|
worker = Worker.get_worker(worker_param[0])
|
|
if worker.check_online():
|
|
checked_worker_list.append((worker, worker_param[1],))
|
|
|
|
dispatched_jobs = {w: [] for w in checked_worker_list}
|
|
for blend_file, scene in workpile:
|
|
dispatch = JobDispatcher.dispatch_scene_to_workers(
|
|
checked_worker_list,
|
|
project_path, blend_file, scene)
|
|
for worker, job in dispatch.items():
|
|
dispatched_jobs[worker].append(job)
|
|
|
|
for (worker, joblist) in dispatched_jobs.items():
|
|
worker[0].process_joblist(project_path, worker[1], joblist)
|
|
|
|
return dispatched_jobs
|
|
|
|
@staticmethod
|
|
def dispatch_scene_to_workers(worker_list, project_path, blend_file, scene):
|
|
blend = bf.open_blend(blend_file)
|
|
scenes_info = retrieve_blenderfile_scenes_info(blend)
|
|
if scene not in scenes_info:
|
|
raise ValueError
|
|
|
|
start_frame, end_frame = scenes_info[scene]
|
|
|
|
undispatched_frames = end_frame - start_frame + 1
|
|
available_workers = len(worker_list)
|
|
|
|
next_start_frame = start_frame
|
|
round_batch = False
|
|
job_dispatch = {}
|
|
for worker, remote_project_path in worker_list:
|
|
if not round_batch and undispatched_frames % available_workers != 0:
|
|
batch = math.ceil(undispatched_frames / available_workers)
|
|
elif not round_batch:
|
|
batch = undispatched_frames // available_workers
|
|
round_batch = True
|
|
|
|
job = Job(
|
|
blend_file.replace(project_path, remote_project_path),
|
|
scene, next_start_frame, next_start_frame + batch - 1,
|
|
project_path, remote_project_path)
|
|
|
|
job_dispatch[(worker, remote_project_path,)] = job
|
|
|
|
next_start_frame += batch
|
|
undispatched_frames -= batch
|
|
available_workers -= 1
|
|
|
|
return job_dispatch
|
|
|
|
|
|
def main():
|
|
host = ''
|
|
port = 8026
|
|
|
|
root = Index()
|
|
|
|
root.putChild(b'status', StatusResource())
|
|
root.putChild(b'workpile', WorkpileResource())
|
|
# root.putChild(b'cancel', CancelResource)
|
|
site = server.Site(root)
|
|
reactor.listenTCP(port, site)
|
|
|
|
logging.info('Starting server {}:{}'.format(host, port))
|
|
reactor.run()
|
|
# worker_list = [('localhost:8025', '/home/ewandor/projects/dev/blender/PushBlendPull/',)]
|
|
# project_path = '/home/ewandor/projects/dev/blender/PushBlendPull/'
|
|
# dispatch_scene_to_workers(
|
|
# worker_list,
|
|
# project_path,
|
|
# '/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend',
|
|
# 'thom'
|
|
# )
|
|
|
|
|
|
def retrieve_blendfile_assets(blendfile):
|
|
assets_dna_type = ['Image', 'bSound', 'MovieClip']
|
|
assets = []
|
|
for block in blendfile.blocks:
|
|
if block.dna_type_name in assets_dna_type and block.get(b'name') != '':
|
|
assets.append(block.get(b'name'))
|
|
|
|
return assets
|
|
|
|
|
|
def retrieve_blenderfile_scenes_info(blendfile):
|
|
scenes_info = {}
|
|
scenes = [block for block in blendfile.blocks if block.code == b'SC']
|
|
for scene in scenes:
|
|
scenes_info[scene.get((b'id', b'name'))[2:]] = (
|
|
scene.get((b'r', b'sfra')),
|
|
scene.get((b'r', b'efra')),
|
|
)
|
|
|
|
return scenes_info
|
|
|
|
|
|
# blend1 = bf.open_blend('tests/test_blendfile.blend1')
|
|
# blend = bf.open_blend('tests/pitou.blend')
|
|
#
|
|
# seqs = [block for block in blend.blocks if block.dna_type_name == 'Sequence']
|
|
#
|
|
# for block in blend.blocks:
|
|
# print(block.code, '-', block.dna_type_name)
|
|
# print('coucou')
|
|
# for block in blend1.blocks:
|
|
# print(block.code, '-', block.dna_type_name)
|
|
# # assets = retrieve_blendfile_assets(blend)
|
|
# scenes_info = retrieve_blenderfile_scenes_info(blend)
|
|
#
|
|
#
|
|
# print(scenes_info[2][0])
|
|
def get_index_content():
|
|
return '''<html>
|
|
<head>
|
|
</head>
|
|
<body>
|
|
<input type="submit" value="track_status" onclick="return get_status();" />
|
|
<ul id="events"></ul>
|
|
<form action="/workpile" method="post" target="_blank" onsubmit="return submit_workpile()" id="form_workpile">
|
|
farms<br/>
|
|
<input type="checkbox" name="workers" value="hawat:8025:/home/ggentile/projects/dev/blender/PushBlendPull/" checked>hawat:8025<br/>
|
|
scenes<br/>
|
|
<input type="checkbox" name="workpile" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend>pitou"/>//tests/pitou.blend>pitou<br/>
|
|
<input type="checkbox" name="workpile" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend>thom"/>//tests/pitou.blend>thom<br/>
|
|
<input type="checkbox" name="workpile" value="/home/ewandor/projects/dev/blender/PushBlendPull/tests/thom.blend>Scene"/>//tests/thom.blend>Scene<br/>
|
|
project_path<input type="text" name="project_path" value="/home/ewandor/projects/dev/blender/PushBlendPull/"><br/>
|
|
<input type="submit" value="send_job"/>
|
|
</form>
|
|
<script type="text/javascript">
|
|
var listening = false;
|
|
function submit_workpile() {
|
|
submit_ajax(
|
|
document.getElementById("form_workpile"),
|
|
function() {get_status();}
|
|
);
|
|
return false;
|
|
}
|
|
function submit_ajax(form, callback) {
|
|
var request = new XMLHttpRequest();
|
|
request.onreadystatechange = function() {
|
|
if (request.readyState == XMLHttpRequest.DONE && request.status == 200) {
|
|
callback();
|
|
}
|
|
};
|
|
request.open("POST", form.action);
|
|
request.send(new FormData(form));
|
|
update_queue()
|
|
}
|
|
function get_status() {
|
|
if (!listening) {
|
|
var eventList = document.getElementById("events");
|
|
while (eventList.firstChild) {
|
|
eventList.removeChild(eventList.firstChild);
|
|
}
|
|
var evtSource = new EventSource("http://localhost:8025/status");
|
|
evtSource.onmessage = function(e) {
|
|
var newElement = document.createElement("li");
|
|
newElement.innerHTML = "message: " + e.data;
|
|
eventList.appendChild(newElement);
|
|
if (eventList.childNodes.length > 5) {
|
|
eventList.removeChild(eventList.firstChild);
|
|
}
|
|
}
|
|
listening = true;
|
|
}
|
|
}
|
|
|
|
function update_queue() {
|
|
var request = new XMLHttpRequest();
|
|
request.onreadystatechange = function() {
|
|
if (request.readyState == XMLHttpRequest.DONE && request.status == 200) {
|
|
var response = JSON.parse(request.responseText);
|
|
var select = document.getElementById("jobs");
|
|
while (select.firstChild) {
|
|
select.removeChild(select.firstChild);
|
|
}
|
|
response.queue.forEach(function(job) {
|
|
var newElement = document.createElement("option");
|
|
newElement.text = job;
|
|
newElement.value = job;
|
|
select.appendChild(newElement);
|
|
});
|
|
}
|
|
};
|
|
request.open('GET', "http://localhost:8025/job", true);
|
|
request.send(null);
|
|
}
|
|
document.addEventListener("DOMContentLoaded", function(event) {
|
|
document.getElementById("queued_jobs").addEventListener("click", update_queue, false);
|
|
}, false);
|
|
</script>
|
|
</body>
|
|
</html>'''.encode()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|