kafka不支持延迟消息,rocketmq支持的延迟消息粒度有限,pulsar(https://github.com/apache/pulsar)采用优先队列的方式实现,支持任意粒度的延迟消息,不过,对于大量延迟比较久的消息,内存消耗会比较严重。本文学习下如何在mac上搭建pulsar,并通过go 的sdk实现消息的发布和订阅。
docker pull apachepulsar/pulsar:latest
latest: Pulling from apachepulsar/pulsar
d7bfe07ed847: Pull complete
75dfa6203d6c: Pull complete
797c4fa83169: Pull complete
5f506e0917d3: Pull complete
2bf3a856127c: Pull complete
378ca9dc24a7: Pull complete
281b3f6d1348: Pull complete
c69f798d8aaf: Pull complete
7b05439db137: Pull complete
Digest: sha256:4a952b3c662b94247ffc4ff17be16ef176c293baaf346db13a095970f43adfd6
Status: Downloaded newer image for apachepulsar/pulsar:latest
docker.io/apachepulsar/pulsar:latest
docker run -d -it -p 6650:6650 -p 8080:8080 --name pulsar-standalone apachepulsar/pulsar:latest bin/pulsar standalone
935d2e372d0cf924967c6c6b9851b51a12b684e6c50acf9e273f7f8ea1f0de67
docker cp 935d2e372d0cf924967c6c6b9851b51a12b684e6c50acf9e273f7f8ea1f0de67:/pulsar/conf learn/pulsar
docker cp 935d2e372d0cf924967c6c6b9851b51a12b684e6c50acf9e273f7f8ea1f0de67:/pulsar/data learn/pulsar
然后启动
docker run -d -it -p 6650:6650 -p 8080:8080 -v ~/learn/pulsar/data:/pulsar/data -v ~/learn/pulsar/conf:/pulsar/conf --add-host=host.docker.internal:host-gateway --name pulsar-standalone apachepulsar/pulsar:latest bin/pulsar standalone
测试下生产一个消息
docker exec -it pulsar-standalone bash bin/pulsar-client produce my-topic --messages "hello-pulsar"
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.pulsar.common.util.netty.DnsResolverUtil (file:/pulsar/lib/org.apache.pulsar-pulsar-common-2.10.1.jar) to method sun.net.InetAddressCachePolicy.get()
WARNING: Please consider reporting this to the maintainers of org.apache.pulsar.common.util.netty.DnsResolverUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2022-11-06T09:24:09,639+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xe306c336, L:/127.0.0.1:40538 - R:localhost/127.0.0.1:6650]] Connected to server
2022-11-06T09:24:13,141+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"my-topic","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":1000,"maxPendingMessagesAcrossPartitions":50000,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
2022-11-06T09:24:13,425+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-11-06T09:24:13,515+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0xe306c336, L:/127.0.0.1:40538 - R:localhost/127.0.0.1:6650]
2022-11-06T09:24:13,625+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-1-3] Created producer on cnx [id: 0xe306c336, L:/127.0.0.1:40538 - R:localhost/127.0.0.1:6650]
2022-11-06T09:24:14,118+0000 [main] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
2022-11-06T09:24:14,258+0000 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
2022-11-06T09:24:14,299+0000 [main] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [my-topic] [standalone-1-3] Pending messages: 0 --- Publish throughput: 1.20 msg/s --- 0.00 Mbit/s --- Latency: med: 0.000 ms - 95pct: 0.000 ms - 99pct: 0.000 ms - 99.9pct: 0.000 ms - max: -? ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 12.000 bytes - 95pct: 12.000 bytes - 99pct: 12.000 bytes - 99.9pct: 12.000 bytes - max: 12.000 bytes --- Ack received rate: 0.00 ack/s --- Failed messages: 0 --- Pending messages: 0
2022-11-06T09:24:14,342+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-1-3] Closed Producer
2022-11-06T09:24:14,434+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xe306c336, L:/127.0.0.1:40538 ! R:localhost/127.0.0.1:6650] Disconnected
2022-11-06T09:24:16,530+0000 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
当然也可以部署下管理后台
docker run -it \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=~/learn/pulsar/pulsar-manager/application.properties \
apachepulsar/pulsar-manager
然后启动我们的消费者
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652"
OperationTimeout: 60 * time.Second,
ConnectionTimeout: 60 * time.Second,
})
defer client.Close()
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
}
% go run learn/pulsar/consumer/main.go
INFO[0000] Connecting to broker remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established local_addr="[::1]:55467" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready local_addr="[::1]:55467" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected consumer consumerID=1 name=tskmj subscription=my-sub topic="persistent://public/default/my-topic"
INFO[0000] Created consumer consumerID=1 name=tskmj subscription=my-sub topic="persistent://public/default/my-topic"
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:25, entryID:1, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc0002b8000), receivedTime:time.Time{wall:0xc0d1eb16ee415ab8, ext:120175243016, loc:(*time.Location)(0x4c06960)}} -- content: 'hello'
生产一个消息
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652"
OperationTimeout: 60 * time.Second,
ConnectionTimeout: 60 * time.Second,
})
defer client.Close()
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
fmt.Println(client.TopicPartitions("my-topic"))
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
}
go run learn/pulsar/producer/main.go
INFO[0000] Connecting to broker remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established local_addr="[::1]:55454" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready local_addr="[::1]:55454" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected producer cnx="[::1]:55454 -> [::1]:6650" epoch=0 topic="persistent://public/default/my-topic"
INFO[0000] Created producer cnx="[::1]:55454 -> [::1]:6650" producerID=1 producer_name=standalone-2-0 topic="persistent://public/default/my-topic"
Published message
INFO[0000] Closing producer producerID=1 producer_name=standalone-2-0 topic="persistent://public/default/my-topic"
INFO[0000] Closed producer producerID=1 producer_name=standalone-2-0 topic="persistent://public/default/my-topic"