本文是使用 golang 实现 redis 系列的第九篇,主要介绍如何使用 GeoHash 实现搜索附近的人。

搜索附近的POI是一个非常常见的功能,它的技术难点在于地理位置是二维的(经纬度)而我们常用的索引(无论是B树、红黑树还是跳表)都是一维的。GeoHash 算法的本质就是将二维的经纬度转换为一维的表示。

本文核心实现代码可以在Godis:lib/geohash中找到。也可以下载Godis来亲自体验。

兴趣点(Point Of Intererst, POI): 在电子地图中我们关心的各种地点被称为POI, 比如餐厅、超市、写字楼。POI 通常包含名称、经纬度、描述等信息。在搜索附近的人时,你也可以把附近的用户称为POI。

GeoHash 原理
[-180,180][-90,90]

[-90, 0][0, 90][-180, 0][0, 180]

我们对四个小矩形继续二等分:

两次等分后的矩形需要用 4bit 来表示。前两位是第一次等分后所在大矩形的编码,后两位表示第二次分割出的小矩形在大矩形中的位置。

对这些矩形进行进一步分割,分割次数越多矩形越小经度越高,最终我们可以得到最够小的矩形来表示一个点。这个小矩形的编码可以代替这个点的坐标,矩形的边长就是GeoHash的误差。

这种分割方式让我们可以用Z型曲线涂满整个地图,这个Z型曲线叫做Peano空间填充曲线。在大多数情况下空间上相邻的点编码也非常相似。少数情况下编码会发生突变,导致编码相似的点空间距离却很大(比如上图中的0111与1000)。

如图所示除Peano空间填充曲线外还有很多空间填充曲线,其中效果公认较好是Hilbert空间填充曲线。相较于Peano曲线而言,Hilbert曲线没有较大的突变。但是由于Peano曲线实现更加简单,所以 Geohash 算法采用的是Peano空间填充曲线。

实现 GeoHash 编解码

看过上文的介绍,我们可以很快的写出GeoHash的编码过程:

// 返回二进制编码和对应矩形的经纬度范围
func encode0(latitude, longitude float64, bitSize uint) ([]byte, [2][2]float64) {
box := [2][2]float64{ // Geohash的矩形
{-180, 180}, // 经度
{-90, 90}, // 纬度
}
pos := [2]float64{longitude, latitude}
bits := make([]bit, 0)
var precision uint = 0
for precision < bitSize { // 循环直到精度足够
for direction, val := range pos { // 轮流处理经纬度,p.s. 看到这个循环了吗?你可以很方便的把GeoHash推广到N维空间
mid := (box[direction][0] + box[direction][1]) / 2 // 计算分割点
if val < mid {
// 经(纬)度小于中点,编码填0,把一下次二分的上界设为当前区间的中点
box[direction][1] = mid
bits = append(bits, 0)
} else {
// 经(纬)度大于中点,编码填1,把一下次二分的下界设为当前区间的中点
box[direction][0] = mid
bits = append(bits, 1)
}
bit++
precision++
if precision == bitSize {
break
}
}
}
return []byte(bits), box
}

代码非常简单,类似于二分查找。遗憾的是和大多数语言一样 golang 操作二进制数据的最小单位是 byte 而非 bit,所以我们需要额外做一些工作来实现按bit编码:

// 这才是真正的实现,请关注与上一节代码的不同
var bits = []uint8{128, 64, 32, 16, 8, 4, 2, 1} func encode0(latitude, longitude float64, bitSize uint) ([]byte, [2][2]float64) {
box := [2][2]float64{
{-180, 180}, // lng
{-90, 90}, // lat
}
pos := [2]float64{longitude, latitude}
hash := &bytes.Buffer{}
bit := 0
var precision uint = 0
code := uint8(0)
for precision < bitSize {
for direction, val := range pos {
mid := (box[direction][0] + box[direction][1]) / 2
if val < mid {
box[direction][1] = mid
// 编码默认为0,不需要操作
} else {
box[direction][0] = mid
code |= bits[bit]
// 通过位或操作写入1,比如要在字节的第3位写入1应该 code |= 32
}
bit++
if bit == 8 { // 计算完一个字节的编码,将其写入buffer
hash.WriteByte(code)
bit = 0
code = 0
}
precision++
if precision == bitSize {
break
}
}
}
// precision 可能无法被 8 整除,此时剩下的二进制编码写到最后
if code > 0 {
hash.WriteByte(code)
}
return hash.Bytes(), box
}

为了方便传输GeoHash定义了一种文本格式的编码,它是将二进制编码进行Base32变换后得到的:

// GeoHash的映射表和标准Base32映射表有些不同
var enc = base32.NewEncoding("0123456789bcdefghjkmnpqrstuvwxyz").WithPadding(base32.NoPadding)
func ToString(buf []byte) string {
return enc.EncodeToString(buf)
}

写完代码后可以到www.geohash.cn上测试一下结果是否正确。

跟随二进制编码的指示进行二分既可完成解码的过程:

func decode0(hash []byte) [][]float64 {
box := [][]float64{
{-180, 180},
{-90, 90},
}
direction := 0
for i := 0; i < len(hash); i++ {
code := hash[i]
for j := 0; j < len(bits); j++ {
mid := (box[direction][0] + box[direction][1]) / 2
mask := bits[j] // 使用掩码取出指定位
if mask&code > 0 {
// 经(纬)度大于mid
box[direction][0] = mid
} else {
// 经(纬)度小于mid
box[direction][1] = mid
}
direction = (direction + 1) % 2
}
}
return box
}

解码过程不能得到精确的结果只能得到对应矩形的经纬度范围,我们通常使用矩形的中心点来作为编码对应的坐标。

搜索附近

因为位于同一个矩形中的点它们的GeoHash编码拥有相同的前缀,所以我们可以非常容易的找到某个矩形中的所有POI。

理论上我们在使用GeoHash来搜索附近POI时只需要找到一个合适的矩形然后找出其中所有POI即可,实际上我们面临两个问题:

  1. 即上文提到的GeoHash值突变会导致编码相近两个点空间距离很大。
  2. 若我们的位置在矩形的边缘, 那么隔壁矩形里的POI可能会更近。

若我们位于红点位置,北面的绿点虽然与我们不在同一个矩形内但显然更近。

为了解决这些问题,我们除了搜索定位点所在的矩形外,还搜索周围8个区域内的POI。

这样搜索附近需要分为两步来实现:

  1. 计算所有POI的GeoHash值,并使用跳表或B+树等便于进行范围查询的数据结构建立索引
  2. 计算“附近区域”对应的 GeoHash 编码,找出这些区域内的所有POI

建立空间索引

我们将GeoHash的精度设置为64位,这样我们就可以将GeoHash编码转换成uint64类型存入 SortedSet 数据结构中:

func ToInt(buf []byte) uint64 {
if len(buf) < 8 { // 用0填充不足的位数
buf2 := make([]byte, 8)
copy(buf2, buf)
return binary.BigEndian.Uint64(buf2)
}
return binary.BigEndian.Uint64(buf)
}
[0x6000000000000000, 0x7000000000000000)

使用SortedSet 进行索引的代码可以在 Godis:db/geo.go 中找到。

找到附近的区域

我们知道地球半径约为 6371km 那么第一次分割后我们得到了四个东西宽 6371π km 南北高 3186π km 的矩形,以此递推在分割10次后我们可以得到宽约40km高约20km的矩形。也就是说 20bit 的 GeoHash 编码东西方向上的误差为 40km, 南北方向误差为 20 km。

我们在wikipedia 上查到 geohash 的误差范围:

表格中的geohash length 是 base32 编码后的字符串长度,1个字符可以表示5位(bit)。

我们也可以用代码估算指定的距离需要的geohash length:

func estimatePrecisionByRadius(radiusMeters float64, latitude float64) uint {
if radiusMeters == 0 {
return defaultBitSize - 1
}
var precision uint = 1
for radiusMeters < MercatorMax {
radiusMeters *= 2
precision++
} precision -= 2
if latitude > 66 || latitude < -66 {
precision--
if latitude > 80 || latitude < -80 {
precision--
}
}
if precision < 1 {
precision = 1
}
if precision > 32 {
precision = 32
}
return precision*2 - 1
}

这段代码中需要注意两点:

[-180,180][-90,90]

接下来我们计算矩形区域对应的uint64编码的上下界:

func ToRange(scope []byte, precision uint) [2]uint64 {
lower := ToInt(scope)
radius := uint64(1 << (64 - precision))
upper := lower + radius
return [2]uint64{lower, upper}
}
[0110..., 0111...)

下一步是计算九宫格的GeoHash编码,我们采用了计算各个矩形中心点经纬度然后重新编码的实现方式:

func GetNeighbours(latitude, longitude, radiusMeters float64) [][2]uint64 {
precision := estimatePrecisionByRadius(radiusMeters, latitude) center, box := encode0(latitude, longitude, precision)
height := box[0][1] - box[0][0]
width := box[1][1] - box[1][0]
centerLng := (box[0][1] + box[0][0]) / 2
centerLat := (box[1][1] + box[1][0]) / 2
maxLat := ensureValidLat(centerLat + height)
minLat := ensureValidLat(centerLat - height)
maxLng := ensureValidLng(centerLng + width)
minLng := ensureValidLng(centerLng - width) var result [10][2]uint64
leftUpper, _ := encode0(maxLat, minLng, precision)
result[1] = ToRange(leftUpper, precision)
upper, _ := encode0(maxLat, centerLng, precision)
result[2] = ToRange(upper, precision)
rightUpper, _ := encode0(maxLat, maxLng, precision)
result[3] = ToRange(rightUpper, precision)
left, _ := encode0(centerLat, minLng, precision)
result[4] = ToRange(left, precision)
result[5] = ToRange(center, precision)
right, _ := encode0(centerLat, maxLng, precision)
result[6] = ToRange(right, precision)
leftDown, _ := encode0(minLat, minLng, precision)
result[7] = ToRange(leftDown, precision)
down, _ := encode0(minLat, centerLng, precision)
result[8] = ToRange(down, precision)
rightDown, _ := encode0(minLat, maxLng, precision)
result[9] = ToRange(rightDown, precision) return result[1:]
}

也可以通过某个矩形的编码快速推断出附近8个矩形的编码, 这种方式实现难度较高可以参考 Redis 中的实现:

最后在 SortedSet 中找到 POI:

func geoRadius0(sortedSet *sortedset.SortedSet, lat float64, lng float64, radius float64) redis.Reply {
areas := geohash.GetNeighbours(lat, lng, radius)
members := make([][]byte, 0)
for _, area := range areas {
lower := &sortedset.ScoreBorder{Value: float64(area[0])}
upper := &sortedset.ScoreBorder{Value: float64(area[1])}
elements := sortedSet.RangeByScore(lower, upper, 0, -1, true)
for _, elem := range elements {
members = append(members, []byte(elem.Member))
}
}
return reply.MakeMultiBulkReply(members)
}

OK, 大功告成。