在使用消息队列时,消息流转是常见的需求,比如消息需要从ckafka的实例转储到另一个ckafka实例。消息流转的目的通常是为了能够访问不同网络的消息队列,这是因为云上的消息队列通常只开放内网访问。在很多情况下,用户往往是在云服务器cvm中部署一个中转程序。这种方案存在以下不足:
资源可能过度供给。云服务器只用于消息转储太浪费。
资源可能严重不足。在生产或者消费处于高峰时,云服务器无法自动扩展资源。
从一点出发,云函数可以很好地克服云服务器在消息流传上的不足。
云函数的特点是
无服务器云函数可以让用户无需关心服务器的部署运营,只需开发最核心的业务逻辑,即可实现上线运营,具备分布容灾能力,可依据负载自动扩缩容,按照实际调用次数与时长计费
与云服务器相比,云函数的优势在于:
便宜。按量付费,用多少资源交多少钱。
省心。全自动运维,资源自动伸缩。
详细方案
以TDMQ为例,要实现消息在不同网络的消息队列中流转,需要在消息队列之间使用云函数作为中间件,如下图所示:
这里的难点在于需要将消息队列的生产者和消费者程序按云函数的模板改写,下面使用golang语言,以生产者为例进行说明。参考了云函数的golang文档。
一个hello程序如下所示:
package main
import (
"context"
"fmt"
"github.com/tencentyun/scf-go-lib/cloudfunction"
)
type DefineEvent struct {
// test event define
Key1 string `json:"key1"`
Key2 string `json:"key2"`
}
func hello(ctx context.Context, event DefineEvent) (string, error) {
fmt.Println("key1:", event.Key1)
fmt.Println("key2:", event.Key2)
return fmt.Sprintf("Hello %s!", event.Key1), nil
}
func main() {
// Make the handler available for Remote Procedure Call by Cloud Function
cloudfunction.Start(hello)
}
这里按照TDMQ golang模板把上面改写为:
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"log"
"strconv"
"github.com/TencentCloud/tdmq-go-client/pulsar"
"github.com/tencentyun/scf-go-lib/cloudfunction"
)
type DefineEvent struct {
// test event define
Key1 string `json:"key1"`
Key2 string `json:"key2"`
}
func produce(ctx context.Context, event DefineEvent) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://ip:6000",
ListenerName: "custom:pulsar-*********/vpc-*********/subnet-*********",
Authentication: pulsar.NewAuthenticationToken("eyJrZX*********4XRTqs"),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
DisableBatching: true,
Topic: "persistent://pulsar-*********/*********/*********",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
//ctx := context.Background()
for j := 0; j < 10; j++ {
if msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte("Hello " + strconv.Itoa(j)),
}); err != nil {
log.Fatal(err)
} else {
log.Println("Published message: ", msgID)
}
}
}
func main() {
cloudfunction.Start(produce)
}
接着就可以编译,然后上传验证了。