From c5127d9bb64e059edbc903172decf50bccdb2c61 Mon Sep 17 00:00:00 2001 From: Denvi Date: Sun, 17 Jul 2016 12:23:42 +0500 Subject: [PATCH] Multi-thread plotting --- FlatCAM.py | 10 ++- FlatCAMApp.py | 104 +++++++++++++------------ FlatCAMPool.py | 28 +++++++ FlatCAMWorker.py | 13 ++-- FlatCAMWorkerStack.py | 44 +++++++++++ VisPyVisuals.py | 13 +++- tests/worker_stack/__init__.py | 0 tests/worker_stack/test_workerStack.py | 54 +++++++++++++ 8 files changed, 204 insertions(+), 62 deletions(-) create mode 100644 FlatCAMPool.py create mode 100644 FlatCAMWorkerStack.py create mode 100644 tests/worker_stack/__init__.py create mode 100644 tests/worker_stack/test_workerStack.py diff --git a/FlatCAM.py b/FlatCAM.py index 46cfc30..983921d 100644 --- a/FlatCAM.py +++ b/FlatCAM.py @@ -1,5 +1,5 @@ import sys -from PyQt4 import QtGui +from PyQt4 import QtGui, QtCore from FlatCAMApp import App @@ -19,6 +19,8 @@ debug_trace() # QtCore.QCoreApplication.setAttribute(QtCore.Qt.AA_X11InitThreads) # NOTE: Never talk to the GUI from threads! This is why I commented the above. -app = QtGui.QApplication(sys.argv) -fc = App() -sys.exit(app.exec_()) +if __name__ == '__main__': + app = QtGui.QApplication(sys.argv) + fc = App() + sys.exit(app.exec_()) + diff --git a/FlatCAMApp.py b/FlatCAMApp.py index 92ab2ba..279f647 100644 --- a/FlatCAMApp.py +++ b/FlatCAMApp.py @@ -1,23 +1,16 @@ -import sys, traceback -import urllib -import getopt -import random -import logging -import simplejson as json -import re -import webbrowser -import os import Tkinter -from PyQt4 import QtCore +import getopt +import os +import random import time # Just used for debugging. Double check before removing. -from xml.dom.minidom import parseString as parse_xml_string +import urllib +import webbrowser from contextlib import contextmanager -from vispy.geometry import Rect +from xml.dom.minidom import parseString as parse_xml_string ######################################## ## Imports part of FlatCAM ## ######################################## -from FlatCAMWorker import Worker from ObjectCollection import * from FlatCAMObj import * from PlotCanvas import * @@ -26,6 +19,7 @@ from FlatCAMCommon import LoudDict from FlatCAMShell import FCShell from FlatCAMDraw import FlatCAMDraw from FlatCAMProcess import * +from FlatCAMWorkerStack import WorkerStack from MeasurementTool import Measurement from DblSidedTool import DblSidedTool import tclCommands @@ -464,26 +458,34 @@ class App(QtCore.QObject): self.ui.options_scroll_area.verticalScrollBar().sizeHint().width()) #### Worker #### - App.log.info("Starting Worker...") - self.worker = Worker(self) - self.thr1 = QtCore.QThread() - self.worker.moveToThread(self.thr1) - self.connect(self.thr1, QtCore.SIGNAL("started()"), self.worker.run) - self.thr1.start() + # App.log.info("Starting Worker...") + # self.worker = Worker(self) + # self.thr1 = QtCore.QThread() + # self.worker.moveToThread(self.thr1) + # self.connect(self.thr1, QtCore.SIGNAL("started()"), self.worker.run) + # self.thr1.start() #### Check for updates #### # Separate thread (Not worker) - App.log.info("Checking for updates in backgroud (this is version %s)." % str(self.version)) + # App.log.info("Checking for updates in backgroud (this is version %s)." % str(self.version)) - self.worker2 = Worker(self, name="worker2") - self.thr2 = QtCore.QThread() - self.worker2.moveToThread(self.thr2) - self.connect(self.thr2, QtCore.SIGNAL("started()"), self.worker2.run) + # self.worker2 = Worker(self, name="worker2") + # self.thr2 = QtCore.QThread() + # self.worker2.moveToThread(self.thr2) + # self.connect(self.thr2, QtCore.SIGNAL("started()"), self.worker2.run) # self.connect(self.thr2, QtCore.SIGNAL("started()"), # lambda: self.worker_task.emit({'fcn': self.version_check, # 'params': [], # 'worker_name': "worker2"})) - self.thr2.start() + # self.thr2.start() + + # Create multiprocess pool + # self.pool = WorkerPool() + # self.worker_task.connect(self.pool.add_task) + + self.workers = WorkerStack() + self.worker_task.connect(self.workers.add_task) + ### Signal handling ### ## Custom signals @@ -2262,9 +2264,9 @@ class App(QtCore.QObject): def obj_init(obj_inst, app_inst): obj_inst.from_dict(obj) App.log.debug(obj['kind'] + ": " + obj['options']['name']) - self.new_object(obj['kind'], obj['options']['name'], obj_init, active=False, fit=False, plot=False) + self.new_object(obj['kind'], obj['options']['name'], obj_init, active=False, fit=False, plot=True) - self.plot_all() + # self.plot_all() self.inform.emit("Project loaded from: " + filename) App.log.debug("Project loaded") @@ -2318,29 +2320,33 @@ class App(QtCore.QObject): """ self.log.debug("plot_all()") - self.progress.emit(10) + for obj in self.collection.get_list(): + self.object_created.emit(obj, True) - def worker_task(app_obj): - percentage = 0.1 - try: - delta = 0.9 / len(self.collection.get_list()) - except ZeroDivisionError: - self.progress.emit(0) - return - for obj in self.collection.get_list(): - with self.proc_container.new("Plotting"): - obj.plot() - app_obj.object_plotted.emit(obj) - - percentage += delta - self.progress.emit(int(percentage*100)) - - self.progress.emit(0) - self.plots_updated.emit() - - # Send to worker - #self.worker.add_task(worker_task, [self]) - self.worker_task.emit({'fcn': worker_task, 'params': [self]}) + # self.progress.emit(10) + # + # def worker_task(app_obj): + # print "worker task" + # percentage = 0.1 + # try: + # delta = 0.9 / len(self.collection.get_list()) + # except ZeroDivisionError: + # self.progress.emit(0) + # return + # for obj in self.collection.get_list(): + # with self.proc_container.new("Plotting"): + # obj.plot() + # app_obj.object_plotted.emit(obj) + # + # percentage += delta + # self.progress.emit(int(percentage*100)) + # + # self.progress.emit(0) + # self.plots_updated.emit() + # + # # Send to worker + # #self.worker.add_task(worker_task, [self]) + # self.worker_task.emit({'fcn': worker_task, 'params': [self]}) def register_folder(self, filename): self.defaults["last_folder"] = os.path.split(str(filename))[0] diff --git a/FlatCAMPool.py b/FlatCAMPool.py new file mode 100644 index 0000000..f731847 --- /dev/null +++ b/FlatCAMPool.py @@ -0,0 +1,28 @@ +from PyQt4 import QtCore +from multiprocessing import Pool +import dill + +def run_dill_encoded(what): + fun, args = dill.loads(what) + print "load", fun, args + return fun(*args) + +def apply_async(pool, fun, args): + print "...", fun, args + print "dumps", dill.dumps((fun, args)) + return pool.map_async(run_dill_encoded, (dill.dumps((fun, args)),)) + +def func1(): + print "func" + +class WorkerPool(QtCore.QObject): + + def __init__(self): + super(WorkerPool, self).__init__() + self.pool = Pool(2) + + def add_task(self, task): + print "adding task", task + # task['fcn'](*task['params']) + # print self.pool.map(task['fcn'], task['params']) + apply_async(self.pool, func1, ()) diff --git a/FlatCAMWorker.py b/FlatCAMWorker.py index 8c13f4b..cd2423e 100644 --- a/FlatCAMWorker.py +++ b/FlatCAMWorker.py @@ -9,6 +9,7 @@ class Worker(QtCore.QObject): # avoid multiple tests for debug availability pydevd_failed = False + task_completed = QtCore.pyqtSignal(str) def __init__(self, app, name=None): super(Worker, self).__init__() @@ -31,7 +32,7 @@ class Worker(QtCore.QObject): def run(self): - self.app.log.debug("Worker Started!") + # self.app.log.debug("Worker Started!") self.allow_debug() @@ -40,19 +41,19 @@ class Worker(QtCore.QObject): def do_worker_task(self, task): - self.app.log.debug("Running task: %s" % str(task)) + # self.app.log.debug("Running task: %s" % str(task)) self.allow_debug() if ('worker_name' in task and task['worker_name'] == self.name) or \ - ('worker_name' not in task and self.name is None): + ('worker_name' not in task and self.name is None): try: task['fcn'](*task['params']) except Exception as e: self.app.thread_exception.emit(e) raise e + finally: + self.task_completed.emit(self.name) - return - - self.app.log.debug("Task ignored.") + # self.app.log.debug("Task ignored.") diff --git a/FlatCAMWorkerStack.py b/FlatCAMWorkerStack.py new file mode 100644 index 0000000..cc9a7d7 --- /dev/null +++ b/FlatCAMWorkerStack.py @@ -0,0 +1,44 @@ +from PyQt4 import QtCore +from FlatCAMWorker import Worker +import multiprocessing + + +class WorkerStack(QtCore.QObject): + + worker_task = QtCore.pyqtSignal(dict) # 'worker_name', 'func', 'params' + thread_exception = QtCore.pyqtSignal(object) + + def __init__(self): + super(WorkerStack, self).__init__() + + self.workers = [] + self.threads = [] + self.load = {} # {'worker_name': tasks_count} + + # Create workers crew + for i in range(0, multiprocessing.cpu_count()): + worker = Worker(self, 'Slogger-' + str(i)) + thread = QtCore.QThread() + + worker.moveToThread(thread) + worker.connect(thread, QtCore.SIGNAL("started()"), worker.run) + worker.task_completed.connect(self.on_task_completed) + + thread.start() + + self.workers.append(worker) + self.threads.append(thread) + self.load[worker.name] = 0 + + def __del__(self): + for thread in self.threads: + thread.terminate() + + def add_task(self, task): + print "New task", self.load + worker_name = min(self.load, key=self.load.get) + self.load[worker_name] += 1 + self.worker_task.emit({'worker_name': worker_name, 'fcn': task['fcn'], 'params': task['params']}) + + def on_task_completed(self, worker_name): + self.load[str(worker_name)] -= 1 diff --git a/VisPyVisuals.py b/VisPyVisuals.py index e04de53..700c587 100644 --- a/VisPyVisuals.py +++ b/VisPyVisuals.py @@ -4,6 +4,7 @@ from vispy.gloo import set_state from vispy.geometry.triangulation import Triangulation from vispy.color import Color from shapely.geometry import Polygon, LineString, LinearRing +import threading import numpy as np try: @@ -110,6 +111,7 @@ class ShapeCollectionVisual(CompoundVisual): """ self.data = {} self.last_key = -1 + self.lock = threading.Lock() self._meshes = [MeshVisual() for _ in range(0, layers)] self._lines = [LineVisual(antialias=True) for _ in range(0, layers)] @@ -147,16 +149,21 @@ class ShapeCollectionVisual(CompoundVisual): :return: int Index of shape """ + self.lock.acquire(True) self.last_key += 1 + key = self.last_key + self.lock.release() - self.data[self.last_key] = {'geometry': shape, 'color': color, 'face_color': face_color, + self.data[key] = {'geometry': shape, 'color': color, 'face_color': face_color, 'visible': visible, 'layer': layer} - self.update_shape_buffers(self.last_key) + + # TODO: Make in separate process -> data[key] (dict) + self.update_shape_buffers(key) if update: self._update() - return self.last_key + return key def update_shape_buffers(self, key): """ diff --git a/tests/worker_stack/__init__.py b/tests/worker_stack/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/worker_stack/test_workerStack.py b/tests/worker_stack/test_workerStack.py new file mode 100644 index 0000000..f8ffea7 --- /dev/null +++ b/tests/worker_stack/test_workerStack.py @@ -0,0 +1,54 @@ +from unittest import TestCase +from FlatCAMWorkerStack import WorkerStack +from PyQt4 import QtCore, QtGui +import random +from multiprocessing import Process, Queue +from shapely.geometry import LineString +import time + + +def calc(line, q): + res = q.get() + res += list(line.coords) + + res = [res for _ in range(3)] + + q.put(res) + # a = [x ** x for x in range(0, random.randint(5000, 10000))] + print "process ended" + return + + +class TestWorkerStack(TestCase): + + def test_create(self): + a = QtGui.QApplication([]) + + s = WorkerStack() + time.sleep(1) + + for i in range(0, 4): + s.add_task({'fcn': self.task, 'params': [i]}) + + # self.sleep(1000) + # + # for i in range(0, 8): + # s.add_task({'fcn': self.task, 'params': [i + 8]}) + + time.sleep(2) + + def task(self, id): + print "Task", id, "started" + + line = LineString([(0, 0), (1, 1), (1, 0), (0, 0)]) + out = [(9, 9)] + + q = Queue() + q.put(out) + + p = Process(target=calc, args=(line, q)) + p.start() + p.join() + + out = q.get() + print "Task", id, "ended", out