为什么我要写这篇文章
作为一枚技术 Doge,每天总免不了和不懂技术的老板 探(si) 讨(bi) 业务的实现可能性;
前一段日子,老板在深(bi) 入(jiao) 调(jia) 研(ge) 之后决定引入一个第三方 IOT 平台,通过 RESTful API 实时反馈设备的监测数据,而因为公司业务的特殊性,所获得的原始监测数据我们无法直接使用,而是必须经过一番计算。以为每位登录后台的用户显示过去一周的实时能源消耗为例,我们需要在十五秒内完成大约 60 MB (120 万条记录) 数据的提取、清洗及计算,而这显然超出了浏览器和单次 HTTP 请求所能承受的极限。这个需求让我想起了之前自己学习 Golang 练手时写过的一个分布式计算模型,于是趁此机会把它扩展了一下,写成框架发布到社区里,有兴趣的童鞋可以 star 回去试验一下哦。
为什么要用 Go 重新造轮子
Golang 是 Google 在2007年发布的一门开源的静态编译型编程语言,在垃圾回收、结构类型以及并发编程的处理上拥有自己的独到之处,近年来更是成为使用频率上升速度最快的编程语言之一。 可以参考这篇文章,Go 在打包编译后的性能与 Java 或 C++相似。在我们的使用中,Go 一般比 Python 要快 30 倍。同时,其天然支持的 CSP 模型在很多情况下可以免去了消息队列的使用,对高并发场景独到的处理优势,更是能够大大缩短程序员的开发时间哦。
GoCollaborate 是什么?
很多同学看到这里可能会问了,这个框架什么?我又能用它来做什么呢?
简而言之,
GoCollaborate 是一个提供分布式服务管理搭建的轻量级通用框架,您可以轻松地用它进行编程,构建扩展,以及创建自己的高性能分布式服务。
有相关从业经验的同学可能听说过 Apache Hadoop,Spark,Lightbend 的 Akka, 阿里的 Dubbo 以及 Facebook 的 Thrift 等等,一套工具集下来是不是感觉晕头转向呢?不要紧,我们在这姑且暂时把它当成一个轻量级的 Hadoop 好了,随着教程展开,让我们一起来体验 Golang 的神奇魅力。
下面我们用一个简单的应用展示框架的基本用法和原理,更多应用请参考官方例库,或者直接提交 issue,我觉得有价值的会后续补充上去。
正文
首先是安装:
go get -u github.com/GoCollaborate/src
然后为你的项目创建基本结构,创建好之后看起来像这样
[Your_Project_Name]
┬
├ [core]
┬
└ example.go
├ case.json
└ main.go
case.json
{
"caseid": "GoCollaborateStandardCase",
"cards": {
"localhost:57851": {
"ip": "localhost",
"port": 57851,
"alive": false,
"seed": false
},
"localhost:57852": {
"ip": "localhost",
"port": 57852,
"alive": true,
"seed": true
}
},
"timestamp": 1508619931,
"local": {
"ip": "localhost",
"port": 57852,
"alive": true,
"seed": true
},
"coordinator": {
"ip": "localhost",
"port": 0,
"alive": true,
"seed": false
}
}
caseidcardslocal
然后打开刚才创建的example.go,给我们的计算任务写几个函数:
package core
import (
"fmt"
"github.com/GoCollaborate/src/artifacts/task"
"github.com/GoCollaborate/src/wrappers/taskHelper"
"net/http"
)
// 任务处理器
func ExampleJobHandler(w http.ResponseWriter, r *http.Request) *task.Job {
// 创建一个 Job 实例
job := task.MakeJob()
// 将任务输入 Job 的队列
job.Tasks(&task.Task{task.SHORT,
task.BASE, "exampleFunc",
task.Collection{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4},
task.Collection{0},
task.NewTaskContext(struct{}{}), 0})
// 为当前阶段指定执行器,这里我们简单做一次 Map-Reduce
job.Stacks("core.ExampleTask.Mapper", "core.ExampleTask.Reducer")
// 这里大家可以根据需要为 HTTP 请求返回内容
// ...
return job
}
// 任务调用的处理函数
func ExampleFunc(source *task.Collection,
result *task.Collection,
context *task.TaskContext) bool {
fmt.Println("Example Task Executed...")
var total int
// 计算数据集内数据的总和
for _, n := range *source {
total += n.(int)
}
// 将总和写入结果集
result.Append(total)
return true
}
type SimpleMapper int
func (m *SimpleMapper) Map(inmaps map[int]*task.Task) (map[int]*task.Task, error) {
// 将任务平均映射成三个子任务
return taskHelper.Slice(inmaps, 3), nil
}
type SimpleReducer int
func (r *SimpleReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, error) {
var sum int
// 根据返回结果计算总和
for _, s := range maps {
for _, r := range (*s).Result {
sum += r.(int)
}
}
fmt.Printf("The sum of numbers is: %v \n", sum)
fmt.Printf("The task set is: %v", maps)
return maps, nil
}
main.go
package main
import (
"./core"
"github.com/GoCollaborate/src"
)
func main() {
mp := new(core.SimpleMapper)
rd := new(core.SimpleReducer)
collaborate.Set("Function", core.ExampleFunc, "exampleFunc")
collaborate.Set("Mapper", mp, "core.ExampleTask.Mapper")
collaborate.Set("Reducer", rd, "core.ExampleTask.Reducer")
collaborate.Set("Shared", []string{"GET", "POST"}, core.ExampleJobHandler)
collaborate.Run()
}
跑一下,看能运行吗?
go run main.go -mode=clbt
刚才创建的任务函数将被映射到
http://localhost:8080/core/ExampleJobHandler
退出程序,我们把刚才创建的项目文件夹复制一份,开始真正的分布式计算:
cp Your_Project_Name Your_Project_Name_Copy
case.json
{
"caseid": "GoCollaborateStandardCase",
"cards": {
"localhost:57852": {
"ip": "localhost",
"port": 57852,
"alive": true,
"seed": true
}
},
"timestamp": 1508619931,
"local": {
"ip": "localhost",
"port": 57851,
"alive": true,
"seed": false
},
"coordinator": {
"ip": "localhost",
"port": 0,
"alive": true,
"seed": false
}
}
保存,退出,然后依次进入不同目录下启动两个项目,这里如果大家在本地运行的话记得加个参数,记得把第二个应用的端口设为8081以免冲突哦:
go run main.go -mode=clbt -port=8081
go run main.go -mode=clbt
现在可以访问:
http://localhost:8080/core/ExampleJobHandler
// and
http://localhost:8081/core/ExampleJobHandler
执行刚才注册的任务啦,看看控制台,是不是输出了什么?
然后还有一个做了一半的 UI,提供一点基本统计分析:
http://localhost:8080
因为是个人项目,肯定还有很多不足,最后再附上 github 链接 GoCollaborate ,欢迎大家提交 issue 或者拍砖,当然愿意贡献代码的大虾就更欢迎了,谢谢阅读!
P.S. 框架的全文文档都在这里,本文的例子在这里。