diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..baed050 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +.git +example +npm +LICENSE +*.md +*.pbf +*.mask +*.js diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..695bf98 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,36 @@ +# base image +FROM ubuntu:latest + +# configure env +ENV DEBIAN_FRONTEND 'noninteractive' + +# update apt, install core apt dependencies and delete the apt-cache +# note: this is done in one command in order to keep down the size of intermediate containers +RUN apt update && \ + apt install -y locales git-core sqlite3 libsqlite3-mod-spatialite golang && \ + rm -rf /var/lib/apt/lists/* + +# configure locale +RUN locale-gen 'en_US.UTF-8' +ENV LANG 'en_US.UTF-8' +ENV LANGUAGE 'en_US:en' +ENV LC_ALL 'en_US.UTF-8' + +# configure git +RUN git config --global 'user.email' 'null@null.com' +RUN git config --global 'user.name' 'Missinglink PBF' + +# set GOPATH +ENV GOPATH='/tmp/go' + +# change working dir +WORKDIR "$GOPATH/src/github.com/missinglink/pbf" + +# copy files +COPY . "$GOPATH/src/github.com/missinglink/pbf" + +# fetch dependencies +RUN go get + +# build binary +RUN go build diff --git a/command/bitmask_custom.go b/command/bitmask_custom.go index d71ba24..cc61f5f 100644 --- a/command/bitmask_custom.go +++ b/command/bitmask_custom.go @@ -11,6 +11,11 @@ import ( "github.com/codegangsta/cli" ) +// @todo: depending on which elements are defined in the config, we can +// skip over large chunks of the file. eg: if the config only contains +// relation patterns then it will be much faster if we skip over the +// nodes and ways on the first pass + // BitmaskCustom cli command func BitmaskCustom(c *cli.Context) error { @@ -22,7 +27,7 @@ func BitmaskCustom(c *cli.Context) error { } // create parser - parser := parser.NewParser(c.Args()[0]) + p := parser.NewParser(c.Args()[0]) // don't clobber existing bitmask file if _, err := os.Stat(c.Args()[1]); err == nil { @@ -60,7 +65,32 @@ func BitmaskCustom(c *cli.Context) error { defer handle.Masks.WriteToFile(c.Args()[1]) // Parse will block until it is done or an error occurs. - parser.Parse(handle) + p.Parse(handle) + + // --- second pass --- + // run parser a second time, skipping the nodes + // @todo: skip relations on the second pass too + + // if we are not interested in relations, exit now + if 0 == len(config.RelationPatterns) { + return nil + } + + // disable indexing + os.Unsetenv("INDEXING") + + // create a new parser + p2 := parser.NewParser(c.Args()[0]) + + // find first way offset + offset, err := p2.GetDecoder().Index.FirstOffsetOfType("way") + if nil != err { + log.Printf("target type: %s not found in file\n", "way") + os.Exit(1) + } + + // Parse will block until it is done or an error occurs. + p2.ParseFrom(handle, offset) return nil } diff --git a/command/json_flat.go b/command/json_flat.go index 802d960..b3c0daf 100644 --- a/command/json_flat.go +++ b/command/json_flat.go @@ -9,6 +9,7 @@ import ( "github.com/missinglink/pbf/lib" "github.com/missinglink/pbf/parser" "github.com/missinglink/pbf/proxy" + "github.com/missinglink/pbf/spatialite" "github.com/codegangsta/cli" ) @@ -50,6 +51,7 @@ func JSONFlat(c *cli.Context) error { var handle = &handler.DenormalizedJSON{ Conn: conn, Writer: lib.NewBufferedWriter(), + Spatialite: &spatialite.Connection{}, ComputeCentroid: c.BoolT("centroid"), ComputeGeohash: c.Bool("geohash"), ExportLatLons: c.Bool("vertices"), @@ -58,17 +60,26 @@ func JSONFlat(c *cli.Context) error { // close the writer routine and flush defer handle.Writer.Close() + // open the spatialite connection + handle.Spatialite.Open(":memory:") + + // close the spatialite connection + defer handle.Spatialite.Close() + // create db writer routine - writer := leveldb.NewCoordWriter(conn) + writer := leveldb.NewWriter(conn) // ensure all node refs are written to disk before starting on the ways dec := p.GetDecoder() dec.Triggers = []func(int, uint64){ func(i int, offset uint64) { - if 0 == i { - log.Println("writer close") - writer.Close() - log.Println("writer closed") + switch i { + case 0: + writer.NodeQueue.Close() + log.Println("nodes written") + case 1: + writer.WayQueue.Close() + log.Println("ways written") } }, } diff --git a/command/store_noderefs.go b/command/store_noderefs.go index 2508013..3df799d 100644 --- a/command/store_noderefs.go +++ b/command/store_noderefs.go @@ -46,16 +46,19 @@ func StoreNodeRefs(c *cli.Context) error { defer conn.Close() // create db writer routine - writer := leveldb.NewCoordWriter(conn) + writer := leveldb.NewWriter(conn) // ensure all node refs are written to disk before starting on the ways dec := parser.GetDecoder() dec.Triggers = []func(int, uint64){ func(i int, offset uint64) { - if 0 == i { - log.Println("writer close") - writer.Close() - log.Println("writer closed") + switch i { + case 0: + writer.NodeQueue.Close() + log.Println("nodes written") + case 1: + writer.WayQueue.Close() + log.Println("ways written") } }, } diff --git a/handler/bitmask_custom.go b/handler/bitmask_custom.go index 900b728..7841ef3 100644 --- a/handler/bitmask_custom.go +++ b/handler/bitmask_custom.go @@ -14,6 +14,8 @@ type BitmaskCustom struct { // ReadNode - called once per node func (b *BitmaskCustom) ReadNode(item gosmparse.Node) { + + // nodes in feature list if b.Features.MatchNode(item) { b.Masks.Nodes.Insert(item.ID) } @@ -21,7 +23,10 @@ func (b *BitmaskCustom) ReadNode(item gosmparse.Node) { // ReadWay - called once per way func (b *BitmaskCustom) ReadWay(item gosmparse.Way) { + + // ways in feature list if b.Features.MatchWay(item) { + b.Masks.Ways.Insert(item.ID) // insert dependents in mask @@ -29,15 +34,77 @@ func (b *BitmaskCustom) ReadWay(item gosmparse.Way) { b.Masks.WayRefs.Insert(ref) } } + + // ways belonging to a relation + if b.Masks.RelWays.Has(item.ID) { + + // insert dependents in mask + for _, ref := range item.NodeIDs { + b.Masks.RelNodes.Insert(ref) + } + } } // ReadRelation - called once per relation func (b *BitmaskCustom) ReadRelation(item gosmparse.Relation) { - // @todo: relations currently not supported - // due to requiring a 'second-pass' to gather the node ids for - // each member way + if b.Features.MatchRelation(item) { + + // we currently only support the 'multipolygon' and 'boundary' types + // see: http://wiki.openstreetmap.org/wiki/Types_of_relation + if val, ok := item.Tags["type"]; ok && (val == "multipolygon" || val == "boundary") { + + // detect relation class + var isSuperRelation = false + var hasNodeCentroid = false - // if b.Features.MatchRelation(item) { - // b.Masks.Relations.Insert(item.ID) - // } + // iterate members once to try to classify the relation + for _, member := range item.Members { + switch member.Type { + case gosmparse.RelationType: + isSuperRelation = true + case gosmparse.NodeType: + switch member.Role { + case "label": + hasNodeCentroid = true + case "admin_centre": + hasNodeCentroid = true + } + } + } + + // super relations are relations containing other relations + // we currently do not support these due to their complexity + if isSuperRelation { + return + } + + // iterate over relation members + for _, member := range item.Members { + + switch member.Type { + case gosmparse.NodeType: + + // only store nodes if they are for 'label' or 'admin_centre' + if member.Role == "label" || member.Role == "admin_centre" { + b.Masks.RelNodes.Insert(member.ID) + } + + case gosmparse.WayType: + + // only store ways if we don't have a node centroid + if !hasNodeCentroid { + + // skip cyclic references to parent (subarea) and other junk roles + if member.Role == "outer" || member.Role == "inner" || member.Role == "" { + + b.Masks.RelWays.Insert(member.ID) + } + } + } + } + + // insert relation in mask + b.Masks.Relations.Insert(item.ID) + } + } } diff --git a/handler/denormalized_json.go b/handler/denormalized_json.go index 3915c2a..40e514a 100644 --- a/handler/denormalized_json.go +++ b/handler/denormalized_json.go @@ -1,12 +1,15 @@ package handler import ( + "fmt" "log" + "strings" "github.com/missinglink/gosmparse" "github.com/missinglink/pbf/json" "github.com/missinglink/pbf/leveldb" "github.com/missinglink/pbf/lib" + "github.com/missinglink/pbf/spatialite" "github.com/missinglink/pbf/tags" "github.com/mmcloughlin/geohash" ) @@ -15,6 +18,7 @@ import ( type DenormalizedJSON struct { Writer *lib.BufferedWriter Conn *leveldb.Connection + Spatialite *spatialite.Connection ComputeCentroid bool ComputeGeohash bool ExportLatLons bool @@ -100,6 +104,129 @@ func (d *DenormalizedJSON) ReadRelation(item gosmparse.Relation) { DeleteTags(item.Tags, uninterestingTags) // relation - obj := json.RelationFromParser(item) + obj := json.DenormalizedRelation{ + ID: item.ID, + Type: "relation", + Tags: item.Tags, + } + + // compute polygon centroid + if d.ComputeCentroid { + + // iterate members once to try to classify the relation + var nodeCentroidID int64 + var wayIDs []int64 + + for _, member := range item.Members { + switch member.Type { + case gosmparse.NodeType: + // only target the 'label' or 'admin_centre' nodes + if member.Role == "label" || member.Role == "admin_centre" { + + // store the ID of the node which contains the centroid info + nodeCentroidID = member.ID + } + case gosmparse.WayType: + // skip cyclic references to parent (subarea) and other junk roles + if member.Role == "outer" || member.Role == "inner" || member.Role == "" { + + // append way ID to list of member ways + wayIDs = append(wayIDs, member.ID) + } + } + } + + // this is the simplest relation to build, we simply need to load the + // 'label' or 'admin_centre' node its lat/lon as the relation centroid + if 0 != nodeCentroidID { + + var node, readError = d.Conn.ReadCoord(nodeCentroidID) + if nil != readError { + // skip relation if the point is not found in the db + log.Printf("skipping relation %d. failed to load admin centre %d\n", item.ID, nodeCentroidID) + return + } + + // set the centroid + obj.Centroid = json.NewLatLon(node.Lat, node.Lon) + + } else { + // this is more complex, we need to load all the multipolygon linestrings + // from the DB and assemble the geometry before calculating the centroid + + // generate WKT strings as input for 'GeomFromText' + var lineStrings []string + for _, wayID := range wayIDs { + + // load way from DB + var way, readError = d.Conn.ReadPath(wayID) + if nil != readError { + // skip ways which fail to denormalize + log.Printf("skipping relation %d. failed to load way %d\n", item.ID, wayID) + return + } + + // load vertices from DB + var vertices []string + for _, ref := range way.NodeIDs { + var node, readError = d.Conn.ReadCoord(ref) + if nil != readError { + // skip ways which fail to denormalize + log.Printf("skipping relation way %d. failed to load ref %d\n", item.ID, ref) + return + } + + vertices = append(vertices, fmt.Sprintf("%f %f", node.Lon, node.Lat)) + } + + lineStrings = append(lineStrings, fmt.Sprintf("(%s)", strings.Join(vertices, ","))) + } + + // build SQL query + var query = `SELECT COALESCE( AsText( PointOnSurface( BuildArea( GeomFromText('MULTILINESTRING(` + query += strings.Join(lineStrings, ",") + query += `)')))),'');` + + // query database for result + var res string + var err = d.Spatialite.DB.QueryRow(query).Scan(&res) + if err != nil { + log.Printf("spatialite: failed to assemble relation: %d", item.ID) + log.Print(err) + + // // spatialite / GEOS debugging + // log.Printf("query: %s", query) + // + // var errGeos, errAus, errGeom string + // d.Spatialite.DB.QueryRow("SELECT COALESCE(GEOS_GetLastErrorMsg(),'')").Scan(&errGeos) + // d.Spatialite.DB.QueryRow("SELECT COALESCE(GEOS_GetLastAuxErrorMsg(),'')").Scan(&errAus) + // d.Spatialite.DB.QueryRow("SELECT COALESCE(LWGEOM_GetLastErrorMsg(),'')").Scan(&errGeom) + // log.Printf("GEOS_GetLastErrorMsg: %s", errGeos) + // log.Printf("GEOS_GetLastAuxErrorMsg: %s", errAus) + // log.Printf("LWGEOM_GetLastErrorMsg: %s", errGeom) + + return + } + + // extract lat/lon values from WKT + var lon, lat float64 + n, _ := fmt.Sscanf(res, "POINT(%f %f)", &lon, &lat) + + // ensure we got 2 floats + if 2 != n { + log.Printf("spatialite: failed to compute centroid for relation: %d", item.ID) + return + } + + // set the centroid + obj.Centroid = json.NewLatLon(lat, lon) + } + } + + // compute geohash + if d.ComputeGeohash { + obj.Hash = geohash.Encode(obj.Centroid.Lat, obj.Centroid.Lon) + } + d.Writer.Queue <- obj.Bytes() } diff --git a/json/denormalized_relation.go b/json/denormalized_relation.go new file mode 100644 index 0000000..07d1b99 --- /dev/null +++ b/json/denormalized_relation.go @@ -0,0 +1,44 @@ +package json + +import ( + "encoding/json" + "fmt" + + "github.com/missinglink/gosmparse" +) + +// DenormalizedRelation struct +type DenormalizedRelation struct { + ID int64 `json:"id"` + Type string `json:"type"` + Hash string `json:"hash,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Centroid *LatLon `json:"centroid,omitempty"` +} + +// Print json +func (rel DenormalizedRelation) Print() { + json, _ := json.Marshal(rel) + fmt.Println(string(json)) +} + +// PrintIndent json indented +func (rel DenormalizedRelation) PrintIndent() { + json, _ := json.MarshalIndent(rel, "", " ") + fmt.Println(string(json)) +} + +// Bytes - return json +func (rel DenormalizedRelation) Bytes() []byte { + json, _ := json.Marshal(rel) + return json +} + +// DenormalizedRelationFromParser - generate a new JSON struct based off a parse struct +func DenormalizedRelationFromParser(item gosmparse.Relation) *Relation { + return &Relation{ + ID: item.ID, + Type: "relation", + Tags: item.Tags, + } +} diff --git a/json/denormalized_way.go b/json/denormalized_way.go index ea35e7a..3ab62c9 100644 --- a/json/denormalized_way.go +++ b/json/denormalized_way.go @@ -36,11 +36,10 @@ func (way DenormalizedWay) Bytes() []byte { } // DenormalizedWayFromParser - generate a new JSON struct based off a parse struct -func DenormalizedWayFromParser(item gosmparse.Way) *Way { - return &Way{ +func DenormalizedWayFromParser(item gosmparse.Way) *DenormalizedWay { + return &DenormalizedWay{ ID: item.ID, Type: "way", Tags: item.Tags, - Refs: item.NodeIDs, } } diff --git a/leveldb/path.go b/leveldb/path.go new file mode 100644 index 0000000..631fa8b --- /dev/null +++ b/leveldb/path.go @@ -0,0 +1,71 @@ +package leveldb + +import ( + "encoding/binary" + + "github.com/missinglink/gosmparse" +) + +// WritePath - encode and write an array of IDs to db +func (c *Connection) WritePath(item gosmparse.Way) error { + + // encode id + idBytes := make([]byte, 8) + binary.BigEndian.PutUint64(idBytes, uint64(item.ID)) + + // prefix way keys with 'W' to avoid id collisions + key := append([]byte{'W'}, idBytes...) + + // encoded path + var value []byte + + // iterate over node refs, appending each int64 id to the value + for _, ref := range item.NodeIDs { + + // encode id + // @todo: use varint encoding to save bytes + refBytes := make([]byte, 8) + binary.BigEndian.PutUint64(refBytes, uint64(ref)) + + // append to slice + value = append(value, refBytes...) + } + + // write to db + err := c.DB.Put(key, value, nil) + if err != nil { + return err + } + + return nil +} + +// ReadPath - read array of IDS from db +func (c *Connection) ReadPath(id int64) (*gosmparse.Way, error) { + + // encode id + idBytes := make([]byte, 8) + binary.BigEndian.PutUint64(idBytes, uint64(id)) + + // prefix way keys with 'W' to avoid id collisions + key := append([]byte{'W'}, idBytes...) + + // read from db + data, err := c.DB.Get(key, nil) + + if err != nil { + return nil, err + } + + // decode node refs + var refs = make([]int64, 0, len(data)/8) + for i := 0; i < len(data); i += 8 { + refs = append(refs, int64(binary.BigEndian.Uint64(data[i:i+8]))) + } + + // decode item + return &gosmparse.Way{ + ID: id, + NodeIDs: refs, + }, nil +} diff --git a/leveldb/writer.go b/leveldb/writer.go index 78624b3..5a299c4 100644 --- a/leveldb/writer.go +++ b/leveldb/writer.go @@ -12,31 +12,21 @@ import ( var batchSize = 20000 -// CoordWriter - buffered stdout writer with sync channel -type CoordWriter struct { +// WriteQueue - a channel + waitgroup for when it's done +type WriteQueue struct { Conn *Connection WaitGroup *sync.WaitGroup - Queue chan kv + Chan chan kv } -type kv struct { - Key []byte - Val []byte -} - -// NewCoordWriter - constructor -func NewCoordWriter(conn *Connection) *CoordWriter { - w := &CoordWriter{ - Conn: conn, - WaitGroup: &sync.WaitGroup{}, - Queue: make(chan kv, batchSize*10), - } +// Start the queue +func (q *WriteQueue) Start() { // start writer routine - w.WaitGroup.Add(1) + q.WaitGroup.Add(1) go func() { batch := new(leveldb.Batch) - for row := range w.Queue { + for row := range q.Chan { // put batch.Put(row.Key, row.Val) @@ -45,7 +35,7 @@ func NewCoordWriter(conn *Connection) *CoordWriter { if batch.Len() >= batchSize { // write batch - err := w.Conn.DB.Write(batch, nil) + err := q.Conn.DB.Write(batch, nil) if err != nil { log.Println(err) } @@ -56,19 +46,57 @@ func NewCoordWriter(conn *Connection) *CoordWriter { } // write final batch - err := w.Conn.DB.Write(batch, nil) + err := q.Conn.DB.Write(batch, nil) if err != nil { log.Println(err) } - w.WaitGroup.Done() + q.WaitGroup.Done() }() +} + +// Close - close the channel and block until done +func (q *WriteQueue) Close() { + close(q.Chan) + q.WaitGroup.Wait() +} + +// Writer - buffered stdout writer with sync channel +type Writer struct { + Conn *Connection + NodeQueue *WriteQueue + WayQueue *WriteQueue +} + +type kv struct { + Key []byte + Val []byte +} + +// NewWriter - constructor +func NewWriter(conn *Connection) *Writer { + var w = &Writer{ + Conn: conn, + NodeQueue: &WriteQueue{ + Conn: conn, + WaitGroup: &sync.WaitGroup{}, + Chan: make(chan kv, batchSize*10), + }, + WayQueue: &WriteQueue{ + Conn: conn, + WaitGroup: &sync.WaitGroup{}, + Chan: make(chan kv, batchSize*10), + }, + } + + w.NodeQueue.Start() + w.WayQueue.Start() return w } -// Enqueue - close the channel and block until done -func (w *CoordWriter) Enqueue(item *gosmparse.Node) { +// EnqueueNode - enqueue node bytes to be saved to db +func (w *Writer) EnqueueNode(item *gosmparse.Node) { // encode id key := make([]byte, 8) @@ -85,11 +113,39 @@ func (w *CoordWriter) Enqueue(item *gosmparse.Node) { // value value := append(lat, lon...) - w.Queue <- kv{Key: key, Val: value} + w.NodeQueue.Chan <- kv{Key: key, Val: value} +} + +// EnqueueWay - enqueue way bytes to be saved to db +func (w *Writer) EnqueueWay(item *gosmparse.Way) { + + // encode id + idBytes := make([]byte, 8) + binary.BigEndian.PutUint64(idBytes, uint64(item.ID)) + + // prefix way keys with 'W' to avoid id collisions + key := append([]byte{'W'}, idBytes...) + + // encoded path + var value []byte + + // iterate over node refs, appending each int64 id to the value + for _, ref := range item.NodeIDs { + + // encode id + // @todo: use varint encoding to save bytes + refBytes := make([]byte, 8) + binary.BigEndian.PutUint64(refBytes, uint64(ref)) + + // append to slice + value = append(value, refBytes...) + } + + w.WayQueue.Chan <- kv{Key: key, Val: value} } // Close - close the channel and block until done -func (w *CoordWriter) Close() { - close(w.Queue) - w.WaitGroup.Wait() +func (w *Writer) Close() { + w.NodeQueue.Close() + w.WayQueue.Close() } diff --git a/lib/bitmaskmap.go b/lib/bitmaskmap.go index e2ac185..af4f1d3 100644 --- a/lib/bitmaskmap.go +++ b/lib/bitmaskmap.go @@ -15,6 +15,8 @@ type BitmaskMap struct { Ways *Bitmask Relations *Bitmask WayRefs *Bitmask + RelNodes *Bitmask + RelWays *Bitmask } // NewBitmaskMap - constructor @@ -24,6 +26,8 @@ func NewBitmaskMap() *BitmaskMap { Ways: NewBitMask(), Relations: NewBitMask(), WayRefs: NewBitMask(), + RelNodes: NewBitMask(), + RelWays: NewBitMask(), } } diff --git a/proxy/store_noderefs.go b/proxy/store_noderefs.go index 3479e47..58342b3 100644 --- a/proxy/store_noderefs.go +++ b/proxy/store_noderefs.go @@ -9,14 +9,16 @@ import ( // StoreRefs - filter only elements that appear in masks type StoreRefs struct { Handler gosmparse.OSMReader - Writer *leveldb.CoordWriter + Writer *leveldb.Writer Masks *lib.BitmaskMap } // ReadNode - called once per node func (s *StoreRefs) ReadNode(item gosmparse.Node) { if nil != s.Masks.WayRefs && s.Masks.WayRefs.Has(item.ID) { - s.Writer.Enqueue(&item) // write to db + s.Writer.EnqueueNode(&item) // write to db + } else if nil != s.Masks.RelNodes && s.Masks.RelNodes.Has(item.ID) { + s.Writer.EnqueueNode(&item) // write to db } if nil != s.Masks.Nodes && s.Masks.Nodes.Has(item.ID) { s.Handler.ReadNode(item) @@ -25,6 +27,9 @@ func (s *StoreRefs) ReadNode(item gosmparse.Node) { // ReadWay - called once per way func (s *StoreRefs) ReadWay(item gosmparse.Way) { + if nil != s.Masks.RelWays && s.Masks.RelWays.Has(item.ID) { + s.Writer.EnqueueWay(&item) // write to db + } if nil != s.Masks.Ways && s.Masks.Ways.Has(item.ID) { s.Handler.ReadWay(item) } diff --git a/readme.md b/readme.md index 5e6984a..7475d6e 100644 --- a/readme.md +++ b/readme.md @@ -79,6 +79,24 @@ OPTIONS: $ go test $(go list ./... | grep -v /vendor/) ``` +### docker + +#### build the docker image + +> this image is currently ~700MB, I would be happy to receive a PR which reduced the on-disk image size. + +```bash +$ docker build -t missinglink/pbf . +``` + +#### run a container + +there are many options for `docker run` which are out-of-scope for this document. if you are new to docker, have a read over the docs for the options specified below and also the `-v` flag which specifies how you can share directories/files between the container and your host OS. + +```bash +$ docker run --rm -it missinglink/pbf ./pbf +``` + ### issues / bugs please open a github issue / open a pull request.