为什么我要写这篇文章

作为一枚技术 Doge,每天总免不了和不懂技术的老板 探(si) 讨(bi) 业务的实现可能性;

前一段日子,老板在深(bi) 入(jiao) 调(jia) 研(ge) 之后决定引入一个第三方 IOT 平台,通过 RESTful API 实时反馈设备的监测数据,而因为公司业务的特殊性,所获得的原始监测数据我们无法直接使用,而是必须经过一番计算。以为每位登录后台的用户显示过去一周的实时能源消耗为例,我们需要在十五秒内完成大约 60 MB (120 万条记录) 数据的提取、清洗及计算,而这显然超出了浏览器和单次 HTTP 请求所能承受的极限。这个需求让我想起了之前自己学习 Golang 练手时写过的一个分布式计算模型,于是趁此机会把它扩展了一下,写成框架发布到社区里,有兴趣的童鞋可以 star 回去试验一下哦。

为什么要用 Go 重新造轮子

Golang 是 Google 在2007年发布的一门开源的静态编译型编程语言,在垃圾回收、结构类型以及并发编程的处理上拥有自己的独到之处,近年来更是成为使用频率上升速度最快的编程语言之一。 可以参考这篇文章,Go 在打包编译后的性能与 Java 或 C++相似。在我们的使用中,Go 一般比 Python 要快 30 倍。同时,其天然支持的 CSP 模型在很多情况下可以免去了消息队列的使用,对高并发场景独到的处理优势,更是能够大大缩短程序员的开发时间哦。

GoCollaborate 是什么?

很多同学看到这里可能会问了,这个框架什么?我又能用它来做什么呢?

简而言之,

GoCollaborate 是一个提供分布式服务管理搭建的轻量级通用框架,您可以轻松地用它进行编程,构建扩展,以及创建自己的高性能分布式服务。

有相关从业经验的同学可能听说过 Apache Hadoop,Spark,Lightbend 的 Akka, 阿里的 Dubbo 以及 Facebook 的 Thrift 等等,一套工具集下来是不是感觉晕头转向呢?不要紧,我们在这姑且暂时把它当成一个轻量级的 Hadoop 好了,随着教程展开,让我们一起来体验 Golang 的神奇魅力。

下面我们用一个简单的应用展示框架的基本用法和原理,更多应用请参考官方例库,或者直接提交 issue,我觉得有价值的会后续补充上去。

正文

首先是安装:

go get -u github.com/GoCollaborate/src

然后为你的项目创建基本结构,创建好之后看起来像这样

[Your_Project_Name]
┬
├ [core]
    ┬
    └ example.go
├ case.json
└ main.go
case.json
{
    "caseid": "GoCollaborateStandardCase",
    "cards": {
        "localhost:57851": {
            "ip": "localhost",
            "port": 57851,
            "alive": false,
            "seed": false
        },
        "localhost:57852": {
            "ip": "localhost",
            "port": 57852,
            "alive": true,
            "seed": true
        }
    },
    "timestamp": 1508619931,
    "local": {
        "ip": "localhost",
        "port": 57852,
        "alive": true,
        "seed": true
    },
    "coordinator": {
        "ip": "localhost",
        "port": 0,
        "alive": true,
        "seed": false
    }
}
caseidcardslocal

然后打开刚才创建的example.go,给我们的计算任务写几个函数:

package core

import (
    "fmt"
    "github.com/GoCollaborate/src/artifacts/task"
    "github.com/GoCollaborate/src/wrappers/taskHelper"
    "net/http"
)

// 任务处理器
func ExampleJobHandler(w http.ResponseWriter, r *http.Request) *task.Job {
    // 创建一个 Job 实例
    job := task.MakeJob()
    // 将任务输入 Job 的队列
    job.Tasks(&task.Task{task.SHORT,
        task.BASE, "exampleFunc",
        task.Collection{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4},
        task.Collection{0},
        task.NewTaskContext(struct{}{}), 0})
    // 为当前阶段指定执行器,这里我们简单做一次 Map-Reduce
    job.Stacks("core.ExampleTask.Mapper", "core.ExampleTask.Reducer")
    
    // 这里大家可以根据需要为 HTTP 请求返回内容
    // ...
    
    return job
}

// 任务调用的处理函数
func ExampleFunc(source *task.Collection,
    result *task.Collection,
    context *task.TaskContext) bool {

    fmt.Println("Example Task Executed...")
    
    var total int
    // 计算数据集内数据的总和
    for _, n := range *source {
        total += n.(int)
    }
    
    // 将总和写入结果集
    result.Append(total)
    return true
}

type SimpleMapper int

func (m *SimpleMapper) Map(inmaps map[int]*task.Task) (map[int]*task.Task, error) {
    // 将任务平均映射成三个子任务
    return taskHelper.Slice(inmaps, 3), nil
}

type SimpleReducer int

func (r *SimpleReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, error) {
    var sum int
    
    // 根据返回结果计算总和
    for _, s := range maps {
        for _, r := range (*s).Result {
            sum += r.(int)
        }
    }
    fmt.Printf("The sum of numbers is: %v \n", sum)
    fmt.Printf("The task set is: %v", maps)
    return maps, nil
}


main.go
package main

import (
    "./core"
    "github.com/GoCollaborate/src"
)
func main() {
    mp := new(core.SimpleMapper)
    rd := new(core.SimpleReducer)
    collaborate.Set("Function", core.ExampleFunc, "exampleFunc")
    collaborate.Set("Mapper", mp, "core.ExampleTask.Mapper")
    collaborate.Set("Reducer", rd, "core.ExampleTask.Reducer")
    collaborate.Set("Shared", []string{"GET", "POST"}, core.ExampleJobHandler)
    collaborate.Run()
}

跑一下,看能运行吗?

go run main.go -mode=clbt

刚才创建的任务函数将被映射到

http://localhost:8080/core/ExampleJobHandler

退出程序,我们把刚才创建的项目文件夹复制一份,开始真正的分布式计算:

cp Your_Project_Name Your_Project_Name_Copy
case.json
{
    "caseid": "GoCollaborateStandardCase",
    "cards": {
        "localhost:57852": {
            "ip": "localhost",
            "port": 57852,
            "alive": true,
            "seed": true
        }
    },
    "timestamp": 1508619931,
    "local": {
        "ip": "localhost",
        "port": 57851,
        "alive": true,
        "seed": false
    },
    "coordinator": {
        "ip": "localhost",
        "port": 0,
        "alive": true,
        "seed": false
    }
}

保存,退出,然后依次进入不同目录下启动两个项目,这里如果大家在本地运行的话记得加个参数,记得把第二个应用的端口设为8081以免冲突哦:

go run main.go -mode=clbt -port=8081
go run main.go -mode=clbt

现在可以访问:

http://localhost:8080/core/ExampleJobHandler
// and 
http://localhost:8081/core/ExampleJobHandler

执行刚才注册的任务啦,看看控制台,是不是输出了什么?


然后还有一个做了一半的 UI,提供一点基本统计分析:

http://localhost:8080


因为是个人项目,肯定还有很多不足,最后再附上 github 链接 GoCollaborate ,欢迎大家提交 issue 或者拍砖,当然愿意贡献代码的大虾就更欢迎了,谢谢阅读!

P.S. 框架的全文文档都在这里,本文的例子在这里。