import os import re import math import json import logging from threading import Thread, Event, RLock from subprocess import Popen, PIPE, STDOUT import requests from requests.exceptions import ConnectionError from twisted.internet import reactor from twisted.web import server, resource, http import sseclient import blendfile as bf logging.basicConfig(level=logging.INFO) PROJECT_PATH = "/home/ewandor/projects/dev/blender/PushBlendPull" class Index(resource.Resource): isLeaf = False def getChild(self, path, request): if path == b'': return self return resource.Resource.getChild(self, path, request) def render_GET(self, request): return get_index_content() class WorkpileResource(resource.Resource): isLeaf = True PARAM_WORKERS = b'workers' PARAM_WORKPILE = b'workpile' PARAM_PROJECT_PATH = b'project_path' @classmethod def get_param_workers(cls, request): if cls.PARAM_WORKERS not in request.args: raise ValueError regex_worker = "^(?P[^:]+(:\d+)?[^:]*):(?P.+)$" workers = [] for worker in request.args[cls.PARAM_WORKERS]: match = re.search(regex_worker, worker.decode("utf-8")) if match is None: raise ValueError workers.append((match.group('server'), match.group('remote_path'),)) return workers @classmethod def get_param_workpile(cls, request): if cls.PARAM_WORKPILE not in request.args: raise ValueError workpile = [] for work in request.args[cls.PARAM_WORKPILE]: result = work.decode("utf-8").split('>') if len(result) != 2: raise ValueError workpile.append(tuple(result)) return workpile @classmethod def get_param_project_path(cls, request): if cls.PARAM_PROJECT_PATH not in request.args: raise ValueError project_path = request.args[cls.PARAM_PROJECT_PATH][0].decode("utf-8") if not os.path.isdir(project_path): raise FileNotFoundError return project_path def render_POST(self, request): try: logging.info('Request Handler: Received a new work pile') stats = JobDispatcher.process_workpile( self.get_param_workers(request), self.get_param_project_path(request), self.get_param_workpile(request) ) return b'OK' except ValueError: return resource.ErrorPage(http.BAD_REQUEST, "Bad request", "Error in the request's format") except FileNotFoundError: return resource.NoResource("Blend file not found") pass class StatusResource(resource.Resource): isLeaf = True def render_GET(self, request): return json.dumps(get_status()).encode() class Job: STATUS_QUEUED = 'queued' STATUS_DONE = 'done' STATUS_CANCELED = 'canceled' def __init__(self, blend_file, scene, start_frame, end_frame, local_project_path, remote_project_path): self.blend_file = blend_file self.scene = scene self.start_frame = start_frame self.end_frame = end_frame self.local_project_path = local_project_path self.remote_project_path = remote_project_path self.current_frame = 0 self.status = self.STATUS_QUEUED def __str__(self): return "{}>{} {}:{}".format( self.blend_file, self.scene if self.scene is not None else "default_scene", self.start_frame if self.start_frame is not None else "start", self.end_frame if self.end_frame is not None else "end" ) def get_http_payload(self): result = {'blend_file': self.blend_file} if self.scene is not None: result['scene'] = self.scene if self.start_frame is not None: result['start_frame'] = self.start_frame if self.end_frame is not None: result['end_frame'] = self.end_frame return result def __eq__(self, other): if isinstance(other, str): return other == str(self) return other == self.blend_file and other.scene == self.scene \ and other.start_frame == self.start_frame \ and other.end_frame == self.end_frame class Worker: path_status = '/status' path_job = '/job' path_cancel = '/cancel' worker_pool = [] finished_jobs = [] @classmethod def get_worker(cls, authority): try: worker = cls.worker_pool[cls.worker_pool.index(authority)] except ValueError: worker = Worker(authority) return worker def __init__(self, authority): self.authority = authority self.job_queue = [] self.idle = True self._upload_lock = RLock() self._download_lock = RLock() self.upload_status = None self.download_status = None self.worker_pool.append(self) def get_http_address(self, path): return 'http://' + self.authority + path def check_online(self): try: requests.get(self.get_http_address(self.path_status)) except ConnectionError: return False return True def process_joblist(self, project_path, remote_path, joblist): reactor.callInThread(self._do_process_joblist, project_path, remote_path, joblist) #Thread(target=self._do_process_joblist, args=(project_path, remote_path, joblist,)).start() def _do_process_joblist(self, project_path, remote_path, joblist): logging.info('Worker<{}>: Synchronizing worker'.format(self)) self.push_to(project_path, remote_path) for job in joblist: self.enqueue_job(job) def push_to(self, project_path, remote_path): self._upload_lock.acquire() for upload_status in RemoteSynchronizer.push_to_worker(self, project_path, remote_path): self.upload_status = upload_status self.upload_status = None self._upload_lock.release() def pull_from(self, project_path, remote_path): self._download_lock.acquire() for download_status in RemoteSynchronizer.pull_from_worker(self, project_path, remote_path): self.download_status = download_status self.download_status = None self._download_lock.release() def enqueue_job(self, job): self.job_queue.append(job) if self.idle: ready_event = Event() Thread(target=self.start_listening, args=(ready_event,)).start() ready_event.wait() logging.info('Worker<{}>: Sending {}'.format(self, job)) requests.post(self.get_http_address(self.path_job), data=job.get_http_payload()) def start_listening(self, ready_event): response = requests.get( self.get_http_address(self.path_status), stream=True, headers={'accept': 'text/event-stream'} ) status_output = sseclient.SSEClient(response) stop = False self.idle = False ready_event.set() for event in status_output.events(): event_data = json.loads(event.data) if event_data[1] == Job.STATUS_CANCELED: if event_data[0] in self.job_queue: job = self.job_queue.pop(self.job_queue.index(event_data[0])) job.status = Job.STATUS_CANCELED self.finished_jobs.append(job) elif event_data[0] == self.job_queue[0]: if event_data[1] == Job.STATUS_DONE: job = self.job_queue.pop(0) job.status = Job.STATUS_DONE self.finished_jobs.append(job) logging.info('Worker<{}>: Job done {}, starting syncing local files'.format(self, job)) Thread(target=self.pull_from, args=(job.local_project_path, job.remote_project_path)).start() if len(self.job_queue) == 0: stop = True elif event_data[1][:4] == 'Fra:': self.job_queue[0].current_frame = int(event_data[1][4:]) if stop: status_output.close() self.idle = True return def __eq__(self, other): if isinstance(other, str): return other == self.authority return other.authority == self.authority def __str__(self): return self.authority def __hash__(self): return self.authority.__hash__() class RemoteSynchronizer: @classmethod def push_to_worker(cls, worker, local_path, remote_path): address = worker.authority.split(':')[0] destination = "{}:{}".format(address, remote_path) return cls._do_sync(local_path, destination, ['*.blend1', 'README.md']) @classmethod def pull_from_worker(cls, worker, local_path, remote_path): address = worker.authority.split(':')[0] source = "{}:{}".format(address, remote_path) return cls._do_sync(source, local_path, ['*.blend']) @classmethod def _do_sync(cls, source, destination, exclude_list): args = ['rsync', '-azP', '--info=progress2'] for exclude_patern in exclude_list: args.append('--exclude') args.append(exclude_patern) args.append(source) args.append(destination) p = Popen(args, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=1) last_line = '' while True: line = p.stdout.readline() if line == '': if p.poll() is not None: break elif line != last_line: logging.debug('Synch: stdout: {}'.format(line)) last_line = line status = cls.handleLine(line) if status is not None: yield status REGEX_PUSHPULL_STATUS = "\d+%" @classmethod def handleLine(cls, line, current_file=None): if line == 'end': return line t = re.search(cls.REGEX_PUSHPULL_STATUS, line) if t is not None: return t[0] def get_status(): workers_status = {} subjobs_status = {} for worker in Worker.worker_pool: for subjob in worker.finished_jobs: jobkey = (subjob.blend_file, subjob.scene) if jobkey not in subjobs_status: subjobs_status[jobkey] = [] subjobs_status[jobkey].append(subjob) job_queue_status = [] for subjob in worker.job_queue: job_queue_status.append(get_subjob_stat(subjob)) jobkey = (subjob.blend_file, subjob.scene) if jobkey not in subjobs_status: subjobs_status[jobkey] = [] subjobs_status[jobkey].append(subjob) workers_status[worker.authority] = { 'upload_status': worker.upload_status, 'download_status':worker.download_status, 'job_queue': job_queue_status } jobs_status = {} for job, subjobs in subjobs_status.items(): total_frame = 0 frame_done = 0 for subjob in subjobs: total_frame += subjob.end_frame - subjob.start_frame + 1 frame_done += subjob.current_frame - subjob.start_frame + 1 jobs_status['>'.join(job)] = {'frame_done': frame_done, 'total_frame': total_frame} return { 'project_path': PROJECT_PATH, 'workers': workers_status, 'jobs': jobs_status } def get_subjob_stat(subjob): return { 'name': str(subjob), 'current_frame': subjob.current_frame, 'total_frame': subjob.end_frame - subjob.start_frame + 1 } class JobDispatcher: @staticmethod def process_workpile(worker_list, project_path, workpile): checked_worker_list = [] remote_paths = {} for worker_param in worker_list: worker = Worker.get_worker(worker_param[0]) if worker.check_online(): checked_worker_list.append((worker, worker_param[1],)) dispatched_jobs = {w: [] for w in checked_worker_list} for blend_file, scene in workpile: dispatch = JobDispatcher.dispatch_scene_to_workers( checked_worker_list, project_path, blend_file, scene) for worker, job in dispatch.items(): dispatched_jobs[worker].append(job) for (worker, joblist) in dispatched_jobs.items(): worker[0].process_joblist(project_path, worker[1], joblist) return dispatched_jobs @staticmethod def dispatch_scene_to_workers(worker_list, project_path, blend_file, scene): blend = bf.open_blend(blend_file) scenes_info = retrieve_blenderfile_scenes_info(blend) if scene not in scenes_info: raise ValueError start_frame, end_frame = scenes_info[scene] undispatched_frames = end_frame - start_frame + 1 available_workers = len(worker_list) next_start_frame = start_frame round_batch = False job_dispatch = {} for worker, remote_project_path in worker_list: if not round_batch and undispatched_frames % available_workers != 0: batch = math.ceil(undispatched_frames / available_workers) elif not round_batch: batch = undispatched_frames // available_workers round_batch = True job = Job( blend_file.replace(project_path, remote_project_path), scene, next_start_frame, next_start_frame + batch - 1, project_path, remote_project_path) job_dispatch[(worker, remote_project_path,)] = job next_start_frame += batch undispatched_frames -= batch available_workers -= 1 return job_dispatch def main(): host = '' port = 8026 root = Index() root.putChild(b'status', StatusResource()) root.putChild(b'workpile', WorkpileResource()) # root.putChild(b'cancel', CancelResource) site = server.Site(root) reactor.listenTCP(port, site) logging.info('Starting server {}:{}'.format(host, port)) reactor.run() # worker_list = [('localhost:8025', '/home/ewandor/projects/dev/blender/PushBlendPull/',)] # project_path = '/home/ewandor/projects/dev/blender/PushBlendPull/' # dispatch_scene_to_workers( # worker_list, # project_path, # '/home/ewandor/projects/dev/blender/PushBlendPull/tests/pitou.blend', # 'thom' # ) def retrieve_blendfile_assets(blendfile): assets_dna_type = ['Image', 'bSound', 'MovieClip'] assets = [] for block in blendfile.blocks: if block.dna_type_name in assets_dna_type and block.get(b'name') != '': assets.append(block.get(b'name')) return assets def retrieve_blenderfile_scenes_info(blendfile): scenes_info = {} scenes = [block for block in blendfile.blocks if block.code == b'SC'] for scene in scenes: scenes_info[scene.get((b'id', b'name'))[2:]] = ( scene.get((b'r', b'sfra')), scene.get((b'r', b'efra')), ) return scenes_info # blend1 = bf.open_blend('tests/test_blendfile.blend1') # blend = bf.open_blend('tests/pitou.blend') # # seqs = [block for block in blend.blocks if block.dna_type_name == 'Sequence'] # # for block in blend.blocks: # print(block.code, '-', block.dna_type_name) # print('coucou') # for block in blend1.blocks: # print(block.code, '-', block.dna_type_name) # # assets = retrieve_blendfile_assets(blend) # scenes_info = retrieve_blenderfile_scenes_info(blend) # # # print(scenes_info[2][0]) def get_index_content(): return '''
farms
hawat:8025
scenes
//tests/pitou.blend>pitou
//tests/pitou.blend>thom
//tests/thom.blend>Scene
project_path
'''.encode() if __name__ == "__main__": main()