Subversion Repositories splitter

Rev

Rev 278 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Copyright (c) 2009.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3 as
 * published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 */

package uk.me.parabola.splitter;

import uk.me.parabola.splitter.Relation.Member;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Splits a map into multiple areas.
 */

class SplitProcessor extends AbstractMapProcessor {
        private final OSMWriter[] writers;

        private SparseLong2ShortMapFunction coords;
        private SparseLong2ShortMapFunction ways;      
        private final WriterDictionaryShort writerDictionary;
        private final DataStorer dataStorer;
        private final Long2IntClosedMapFunction nodeWriterMap;
        private final Long2IntClosedMapFunction wayWriterMap;
        private final Long2IntClosedMapFunction relWriterMap;

        //      for statistics
        private long countQuickTest = 0;
        private long countFullTest = 0;
        private long countCoords = 0;
        private long countWays = 0;
        private final int writerOffset;
        private final int lastWriter;
        private WriterIndex writerIndex;
        private final int maxThreads;
        private final short unassigned = Short.MIN_VALUE;

        private final InputQueueInfo[] writerInputQueues;
        private final BlockingQueue<InputQueueInfo> toProcess;
        private final ArrayList<Thread> workerThreads;
        private final InputQueueInfo STOP_MSG = new InputQueueInfo(null);


        // private int currentNodeAreaSet;
        private BitSet currentWayAreaSet;
        private BitSet currentRelAreaSet;
        private BitSet usedWriters;
       
       
        SplitProcessor(DataStorer dataStorer,
                        int writerOffset, int numWritersThisPass, int maxThreads){
                this.dataStorer = dataStorer;
                this.writerDictionary = dataStorer.getWriterDictionary();
                this.writers = writerDictionary.getWriters();
                this.coords = new SparseLong2ShortMapInline();
                this.ways   = new SparseLong2ShortMapInline();
                this.coords.defaultReturnValue(unassigned);
                this.ways.defaultReturnValue(unassigned);              
                this.writerIndex = dataStorer.getGrid();
                this.countWays = ways.size();
                this.writerOffset = writerOffset;
                this.lastWriter = writerOffset + numWritersThisPass-1;
                this.maxThreads = maxThreads;
                this.toProcess = new ArrayBlockingQueue<InputQueueInfo>(numWritersThisPass);
                this.writerInputQueues = new InputQueueInfo[numWritersThisPass];
                for (int i = 0; i < writerInputQueues.length; i++) {
                        writerInputQueues[i] = new InputQueueInfo(this.writers[i + writerOffset]);
                        writers[i + writerOffset].initForWrite();
                }
                nodeWriterMap = dataStorer.getWriterMap(DataStorer.NODE_TYPE);
                wayWriterMap = dataStorer.getWriterMap(DataStorer.WAY_TYPE);
                relWriterMap = dataStorer.getWriterMap(DataStorer.REL_TYPE);
                currentWayAreaSet = new BitSet(writers.length);
                currentRelAreaSet = new BitSet(writers.length);
                usedWriters = new BitSet();

                int noOfWorkerThreads = Math.min(this.maxThreads - 1, numWritersThisPass);
                workerThreads = new ArrayList<Thread>(noOfWorkerThreads);
                for (int i = 0; i < noOfWorkerThreads; i++) {
                        Thread worker = new Thread(new OSMWriterWorker());
                        worker.setName("worker-" + i);
                        workerThreads.add(worker);
                        worker.start();
                }
               
        }

        @Override
        public void processNode(Node n) {
                try {
                        writeNode(n);
                } catch (IOException e) {
                        throw new RuntimeException("failed to write node " + n.getId(), e);
                }
        }

        @Override
        public void processWay(Way w) {
                currentWayAreaSet.clear();
                int multiTileWriterIdx = (wayWriterMap != null) ? wayWriterMap.getSeq(w.getId()): WriterDictionaryInt.UNASSIGNED;
                if (multiTileWriterIdx != WriterDictionaryInt.UNASSIGNED){
                        BitSet cl = dataStorer.getMultiTileWriterDictionary().getBitSet(multiTileWriterIdx);
                        // set only active writer bits
                        for(int i=cl.nextSetBit(writerOffset); i>=0 && i <= lastWriter; i=cl.nextSetBit(i+1)){
                                currentWayAreaSet.set(i);
                        }
                        //System.out.println("added or completed way: " +  w.getId());
                }
                else{
                        short oldclIndex = unassigned;
                        //for (long id : w.getRefs()) {
                        int refs = w.getRefs().size();
                        for (int i = 0; i < refs; i++){
                                long id = w.getRefs().getLong(i);
                                // Get the list of areas that the way is in.
                                short clIdx = coords.get(id);
                                if (clIdx != unassigned){
                                        if (oldclIndex != clIdx){
                                                BitSet cl = writerDictionary.getBitSet(clIdx);
                                                currentWayAreaSet.or(cl);
                                                oldclIndex = clIdx;
                                        }
                                }
                        }
                }
                if (!currentWayAreaSet.isEmpty()){
                        // store these areas in ways map
                        short idx = writerDictionary.translate(currentWayAreaSet);
                        ways.put(w.getId(), idx);
                        ++countWays;
                        if (countWays % 1000000 == 0){
                                System.out.println("MAP occupancy: " + Utils.format(countWays) + ", number of area dictionary entries: " + writerDictionary.size() + " of " + ((1<<16) - 1));
                                ways.stats(0);
                        }
                }
                try {
                        writeWay(w);
                } catch (IOException e) {
                        throw new RuntimeException("failed to write way " + w.getId(), e);
                }
        }

        @Override
        public void processRelation(Relation rel) {
                currentRelAreaSet.clear();
                int multiTileWriterIdx = (relWriterMap != null) ? relWriterMap.getSeq(rel.getId()): WriterDictionaryInt.UNASSIGNED;
                if (multiTileWriterIdx != WriterDictionaryInt.UNASSIGNED){
                       
                        BitSet cl = dataStorer.getMultiTileWriterDictionary().getBitSet(multiTileWriterIdx);
                        try {
                                // set only active writer bits
                                for(int i=cl.nextSetBit(writerOffset); i>=0 && i <= lastWriter; i=cl.nextSetBit(i+1)){
                                        currentRelAreaSet.set(i);
                                }
                                writeRelation(rel);
                                //System.out.println("added rel: " +  r.getId());
                        } catch (IOException e) {
                                throw new RuntimeException("failed to write relation " + rel.getId(),
                                                e);
                        }
                }
                else{
                        short oldclIndex = unassigned;
                        short oldwlIndex = unassigned;
                        try {
                                for (Member mem : rel.getMembers()) {
                                        // String role = mem.getRole();
                                        long id = mem.getRef();
                                        if (mem.getType().equals("node")) {
                                                short clIdx = coords.get(id);

                                                if (clIdx != unassigned){
                                                        if (oldclIndex != clIdx){
                                                                BitSet wl = writerDictionary.getBitSet(clIdx);
                                                                currentRelAreaSet.or(wl);
                                                        }
                                                        oldclIndex = clIdx;

                                                }

                                        } else if (mem.getType().equals("way")) {
                                                short wlIdx = ways.get(id);

                                                if (wlIdx != unassigned){
                                                        if (oldwlIndex != wlIdx){
                                                                BitSet wl = writerDictionary.getBitSet(wlIdx);
                                                                currentRelAreaSet.or(wl);
                                                        }
                                                        oldwlIndex = wlIdx;
                                                }
                                        }
                                }

                                writeRelation(rel);
                        } catch (IOException e) {
                                throw new RuntimeException("failed to write relation " + rel.getId(),
                                                e);
                        }
                }
        }
        @Override
        public boolean endMap() {
                System.out.println("Statistics for coords map:");
                coords.stats(1);
                System.out.println("");
                System.out.println("Statistics for ways map:");
                ways.stats(1);
                Utils.printMem();
                System.out.println("Full Node tests:  " + Utils.format(countFullTest));
                System.out.println("Quick Node tests: " + Utils.format(countQuickTest));               
                coords = null;
                ways = null;

                for (int i = 0; i < writerInputQueues.length; i++) {
                        try {
                                writerInputQueues[i].stop();
                        } catch (InterruptedException e) {
                                throw new RuntimeException(
                                                "Failed to add the stop element for worker thread " + i,
                                                e);
                        }
                }
                try {
                        if (maxThreads > 1)
                                toProcess.put(STOP_MSG);// Magic flag used to indicate that all data is done.

                } catch (InterruptedException e1) {
                        e1.printStackTrace();
                }

                for (Thread workerThread : workerThreads) {
                        try {
                                workerThread.join();
                        } catch (InterruptedException e) {
                                throw new RuntimeException("Failed to join for thread "
                                                + workerThread.getName(), e);
                        }
                }
                for (int i=writerOffset; i<= lastWriter; i++) {
                        writers[i].finishWrite();
                }
                return true;           
        }

        private void writeNode(Node currentNode) throws IOException {
                int countWriters = 0;
                short lastUsedWriter = unassigned;
                WriterGridResult writerCandidates = writerIndex.get(currentNode);
                int multiTileWriterIdx = (nodeWriterMap != null) ? nodeWriterMap.getSeq(currentNode.getId()): WriterDictionaryInt.UNASSIGNED;

                boolean isSpecialNode = (multiTileWriterIdx != WriterDictionaryInt.UNASSIGNED);
                if (writerCandidates == null && !isSpecialNode)  {
                        return;
                }
                if (isSpecialNode || writerCandidates.l.size() > 1)
                        usedWriters.clear();
                if (writerCandidates != null){
                        for (int i = 0; i < writerCandidates.l.size(); i++) {
                                int n = writerCandidates.l.getShort(i);
                                if (n < writerOffset || n > lastWriter)
                                        continue;
                                OSMWriter w = writers[n];
                                boolean found;
                                if (writerCandidates.testNeeded){
                                        found = w.nodeBelongsToThisArea(currentNode);
                                        ++countFullTest;
                                }
                                else{
                                        found = true;
                                        ++countQuickTest;
                                }
                                if (found) {
                                        usedWriters.set(n);
                                        ++countWriters;
                                        lastUsedWriter = (short) n;
                                        if (maxThreads > 1) {
                                                addToWorkingQueue(n, currentNode);
                                        } else {
                                                w.write(currentNode);
                                        }
                                }
                        }
                }
                if (isSpecialNode){
                        // this node is part of a multi-tile-polygon, add it to all tiles covered by the parent
                        BitSet nodeWriters = dataStorer.getMultiTileWriterDictionary().getBitSet(multiTileWriterIdx);
                        for(int i=nodeWriters.nextSetBit(writerOffset); i>=0 && i <= lastWriter; i=nodeWriters.nextSetBit(i+1)){
                                if (usedWriters.get(i) )
                                        continue;
                                if (maxThreads > 1) {
                                        addToWorkingQueue(i, currentNode);
                                } else {
                                        writers[i].write(currentNode);
                                }
                        }
                }
               
                if (countWriters > 0){
                        short writersID;
                        if (countWriters > 1)
                                writersID = writerDictionary.translate(usedWriters);
                        else  
                                writersID = (short) (lastUsedWriter  - WriterDictionaryShort.DICT_START); // no need to do lookup in the dictionary
                        coords.put(currentNode.getId(), writersID);
                        ++countCoords;
                        if (countCoords % 10000000 == 0){
                                System.out.println("MAP occupancy: " + Utils.format(countCoords) + ", number of area dictionary entries: " + writerDictionary.size() + " of " + ((1<<16) - 1));
                                coords.stats(0);
                        }
                }
        }

        private boolean seenWay;

        private void writeWay(Way currentWay) throws IOException {
                if (!seenWay) {
                        seenWay = true;
                        System.out.println("Writing ways " + new Date());
                }
               
                if (!currentWayAreaSet.isEmpty()) {
                                for (int n = currentWayAreaSet.nextSetBit(0); n >= 0; n = currentWayAreaSet.nextSetBit(n + 1)) {
                                        if (maxThreads > 1) {
                                                addToWorkingQueue(n, currentWay);
                                        } else {
                                                writers[n].write(currentWay);
                                        }
                                }
                        }
        }

        private boolean seenRel;

        private void writeRelation(Relation currentRelation) throws IOException {
                if (!seenRel) {
                        seenRel = true;
                        System.out.println("Writing relations " + new Date());
                }
                for (int n = currentRelAreaSet.nextSetBit(0); n >= 0; n = currentRelAreaSet.nextSetBit(n + 1)) {
                        // if n is out of bounds, then something has gone wrong
                        if (maxThreads > 1) {
                                addToWorkingQueue(n, currentRelation);
                        } else {
                                writers[n].write(currentRelation);
                        }
                }
        }

        private void addToWorkingQueue(int writerNumber, Element element) {
                try {
                        writerInputQueues[writerNumber-writerOffset].put(element);
                } catch (InterruptedException e) {
                        // throw new RuntimeException("Failed to write node " +
                        // element.getId() + " to worker thread " + writerNumber, e);
                }
        }

        private class InputQueueInfo {
                private final OSMWriter writer;
                private ArrayList<Element> staging;
                private final BlockingQueue<ArrayList<Element>> inputQueue;

                public InputQueueInfo(OSMWriter writer) {
                        inputQueue =  new ArrayBlockingQueue<ArrayList<Element>>(NO_ELEMENTS);
                        this.writer = writer;
                        this.staging = new ArrayList<Element>(STAGING_SIZE);
                }

                void put(Element e) throws InterruptedException {
                        staging.add(e);
                        if (staging.size() < STAGING_SIZE)
                                return;
                        flush();
                }

                void flush() throws InterruptedException {
                        // System.out.println("Flush");
                        inputQueue.put(staging);
                        staging = new ArrayList<Element>(STAGING_SIZE);
                        toProcess.put(this);
                }

                void stop() throws InterruptedException {
                        flush();
                }
        }

        public static final int NO_ELEMENTS = 3;
        final int STAGING_SIZE = 300;

        private class OSMWriterWorker implements Runnable {

                public void processElement(Element element, OSMWriter writer)
                                throws IOException {
                        if (element instanceof Node) {
                                writer.write((Node) element);
                        } else if (element instanceof Way) {
                                writer.write((Way) element);
                        } else if (element instanceof Relation) {
                                writer.write((Relation) element);
                        }
                }

                @Override
                public void run() {
                        boolean finished = false;
                        while (!finished) {
                                InputQueueInfo workPackage = null;
                                try {
                                        workPackage = toProcess.take();
                                } catch (InterruptedException e1) {
                                        e1.printStackTrace();
                                        continue;
                                }
                                if (workPackage == STOP_MSG) {
                                        try {
                                                toProcess.put(STOP_MSG); // Re-inject it so that other
                                                                                                        // threads know that we're
                                                                                                        // exiting.
                                        } catch (InterruptedException e) {
                                                e.printStackTrace();
                                        }
                                        finished = true;
                                } else {
                                        synchronized (workPackage) {
                                        while (!workPackage.inputQueue.isEmpty()) {
                                                        ArrayList<Element> elements = null;
                                                try {
                                                        elements = workPackage.inputQueue.poll();
                                                                for (Element element : elements) {
                                                                processElement(element, workPackage.writer);
                                                        }

                                                } catch (IOException e) {
                                                                throw new RuntimeException("Thread "
                                                                                + Thread.currentThread().getName()
                                                                                + " failed to write element ", e);
                                                }
                                        }
                                        }

                                }
                        }
                        System.out.println("Thread " + Thread.currentThread().getName()
                                        + " has finished");
                }
        }

}