[mkgmap-dev] [PATCH 14/14] New thread design.
From Jeffrey C. Ollie jeff at ocjtech.us on Thu Sep 9 21:12:14 BST 2010
From: Scott Crosby <scrosby at cs.rice.edu> --- src/uk/me/parabola/splitter/SplitProcessor.java | 132 ++++++++++------------- 1 files changed, 56 insertions(+), 76 deletions(-) diff --git a/src/uk/me/parabola/splitter/SplitProcessor.java b/src/uk/me/parabola/splitter/SplitProcessor.java index 4f7220e..71957d9 100644 --- a/src/uk/me/parabola/splitter/SplitProcessor.java +++ b/src/uk/me/parabola/splitter/SplitProcessor.java @@ -18,9 +18,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.Date; -import java.util.HashMap; -import java.util.Map.Entry; -import java.util.TreeMap; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -30,15 +28,18 @@ import uk.me.parabola.splitter.Relation.Member; * Splits a map into multiple areas. */ class SplitProcessor implements MapProcessor { - + public static final int NO_ELEMENTS = 10; + public static final int BUNDLE_SIZE = 2000; + + private final SplitIntMap coords = new SplitIntMap(); private final SplitIntMap ways = new SplitIntMap(); private final IntObjMap<long[]> bigWays = new IntObjMap<long[]>(); private final OSMWriter[] writers; - private final BlockingQueue<Element>[] writerInputQueues; - private final BlockingQueue<InputQueueInfo> writerInputQueue; - private final ArrayList<Thread> workerThreads; + private final BlockingQueue<List<Element>>[] writerInputQueues; + private final List<Element>[] bundlingQueues; + Thread threads[]; private int currentNodeAreaSet; private ArrayList<Integer> currentWayAreaSet, tmpWayAreaSet; @@ -86,24 +87,18 @@ class SplitProcessor implements MapProcessor { this.writers = writers; makeWriterMap(); this.maxThreads = maxThreads; - this.writerInputQueue = new ArrayBlockingQueue<InputQueueInfo>(writers.length); this.writerInputQueues = new BlockingQueue[writers.length]; + this.bundlingQueues = new ArrayList[writers.length]; + this.threads = new Thread[writers.length]; for (int i = 0; i < writerInputQueues.length;i++) { - writerInputQueues[i] = new ArrayBlockingQueue<Element>(NO_ELEMENTS); - writerInputQueue.add(new InputQueueInfo(this.writers[i], writerInputQueues[i])); + writerInputQueues[i] = new ArrayBlockingQueue<List<Element>>(NO_ELEMENTS); + bundlingQueues[i] = new ArrayList<Element>(BUNDLE_SIZE); + threads[i] = new Thread(new OSMWriterWorker(writers[i],writerInputQueues[i])); + threads[i].start(); } tmpWayAreaSet = new ArrayList<Integer>(10); currentWayAreaSet = new ArrayList<Integer>(10); currentRelAreaSet = new BitSet(writers.length); - - int noOfWorkerThreads = this.maxThreads - 1; - workerThreads = new ArrayList<Thread>(noOfWorkerThreads); - for (int i = 0; i < noOfWorkerThreads; i++) { - Thread worker = new Thread(new OSMWriterWorker()); - worker.setName("worker-" + i); - workerThreads.add(worker); - worker.start(); - } } @Override @@ -220,20 +215,18 @@ class SplitProcessor implements MapProcessor { @Override public void endMap() { - for (int i = 0; i < writerInputQueues.length; i++) { - try { - writerInputQueues[i].put(STOP_ELEMENT); - } catch (InterruptedException e) { - throw new RuntimeException("Failed to add the stop element for worker thread " + i, e); - } - } - for (Thread workerThread : workerThreads) { - try { - workerThread.join(); + try { + // Push the stop element into every queue. + for (int i = 0 ; i < threads.length ; i++ ) + addToWorkingQueue(i,STOP_ELEMENT); + // Wait for them to all exit. + for (int i = 0 ; i < threads.length ; i++ ) + threads[i].join(); } catch (InterruptedException e) { - throw new RuntimeException("Failed to join for thread " + workerThread.getName(), e); + // TODO Auto-generated catch block + e.printStackTrace(); } - } + for (OSMWriter writer : writers) { writer.finishWrite(); } @@ -350,29 +343,34 @@ class SplitProcessor implements MapProcessor { } private void addToWorkingQueue(int writerNumber, Element element) { + List<Element> bundle=bundlingQueues[writerNumber]; + bundle.add(element); + if (bundle.size() < BUNDLE_SIZE && element != STOP_ELEMENT) + return; try { - writerInputQueues[writerNumber].put(element); + BlockingQueue<List<Element>> queue = writerInputQueues[writerNumber]; + queue.put(bundle); + bundlingQueues[writerNumber] = new ArrayList<Element>(BUNDLE_SIZE); } catch (InterruptedException e) { throw new RuntimeException("Failed to write node " + element.getId() + " to worker thread " + writerNumber, e); } } - private static class InputQueueInfo { - private final OSMWriter writer; - private final BlockingQueue<Element> inputQueue; - - public InputQueueInfo(OSMWriter writer, BlockingQueue<Element> inputQueue) { - this.writer = writer; - this.inputQueue = inputQueue; - } - } private static final Element STOP_ELEMENT = new Element(); - public static final int NO_ELEMENTS = 1000; private class OSMWriterWorker implements Runnable { + + private OSMWriter writer; + private BlockingQueue<List<Element>> queue; + + public OSMWriterWorker(OSMWriter writer, BlockingQueue<List<Element>> queue) { + this.writer = writer; + this.queue = queue; + } + public void processElement(Element element, OSMWriter writer) throws IOException { if (element instanceof Node) { writer.write((Node) element); @@ -385,41 +383,23 @@ class SplitProcessor implements MapProcessor { @Override public void run() { - boolean finished = false; - while (!finished) { - InputQueueInfo workPackage = writerInputQueue.poll(); - if (workPackage==null) { - finished=true; - } else { - while (!workPackage.inputQueue.isEmpty()) { - Element element =null; - try { - element = workPackage.inputQueue.poll(); - if (element == null) { - writerInputQueue.put(workPackage); - workPackage=null; - break; - } else if (element == STOP_ELEMENT) { - workPackage=null; - break; - } else { - processElement(element, workPackage.writer); - } - - } catch (InterruptedException e) { - throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to get next element", e); - } catch (IOException e) { - throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to write element " + element.getId() + '(' + element.getClass().getSimpleName() + ')', e); - } - } - if (workPackage != null) { - try { - writerInputQueue.put(workPackage); - } catch (InterruptedException e) { - throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to return work package", e); - } - } + while (true) { + //System.out.println("Doing loop"); + try { + + List<Element> elements = queue.take(); + for (Element element : elements) + if (element == STOP_ELEMENT) + return; + else + processElement(element, writer); + + } catch (InterruptedException e) { + throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to get next element", e); + } catch (IOException e) { + throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to write element ",e); } + Thread.yield(); } System.out.println("Thread " + Thread.currentThread().getName() + " has finished"); } -- 1.7.2.3
- Previous message: [mkgmap-dev] [PATCH 13/14] Write the char directly into the buffer, we know its long enough already.
- Next message: [mkgmap-dev] Binary OSM file support?
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the mkgmap-dev mailing list