package main import ( "fmt" "math/rand" "strings" zmq "github.com/pebbe/zmq4" ) const ( NBR_CLIENTS int = 10 NBR_WORKERS int = 3 ) func randomString() string { source := "abcdefghijklmnopqrstuvwxyz" target := make([]string, 20) for i := 0; i < 20; i++ { target = append(target, string(source[rand.Intn(len(source))])) } return strings.Join(target, "") } func set_id(socket *zmq.Socket) { socket.SetIdentity(randomString()) // fmt.Println(socket.GetIdentity()) } func client_task() { context, _ := zmq.NewContext() defer context.Term() client, _ := context.NewSocket(zmq.REQ) set_id(client) client.Connect("tcp://localhost:5671") defer client.Close() //fmt.Println("client is running") // Send request, get reply client.Send("HELLO", 0) reply, _ := client.Recv(0) fmt.Println("Client: ", string(reply)) } // While this example runs in a single process, that is just to make // it easier to start and stop the example. Each goroutine has its own // context and conceptually acts as a separate process. // This is the worker task, using a REQ socket to do load-balancing. // Since s_send and s_recv can't handle 0MQ binary identities we // set a printable text identity to allow routing. func worker_task() { context, _ := zmq.NewContext() defer context.Term() worker, _ := context.NewSocket(zmq.REQ) defer worker.Close() set_id(worker) //fmt.Println(worker.GetIdentity()) worker.Connect("tcp://localhost:5672") // Tell broker we're ready for work worker.Send("READY", 0) for { // Read and save all frames until we get an empty frame // In this example there is only 1 but it could be more messageParts, _ := worker.RecvMessage(0) identity := messageParts[0] empty := messageParts[1] request := messageParts[2] fmt.Println("Worker: ", string(request)) worker.Send(identity, zmq.SNDMORE) worker.Send(empty, zmq.SNDMORE) worker.Send("OK", 0) } } // This is the main task. It starts the clients and workers, and then // routes requests between the two layers. Workers signal READY when // they start; after that we treat them as ready when they reply with // a response back to a client. The load-balancing data structure is // just a queue of next available workers. func main() { context, _ := zmq.NewContext() defer context.Term() frontend, _ := context.NewSocket(zmq.ROUTER) defer frontend.Close() frontend.Bind("tcp://*:5671") backend, _ := context.NewSocket(zmq.ROUTER) defer backend.Close() backend.Bind("tcp://*:5672") fmt.Println("broker is running") var client_nbr int var worker_nbr int for client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++ { go client_task() } for worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++ { go worker_task() } // Here is the main loop for the least-recently-used queue. It has two // sockets; a frontend for clients and a backend for workers. It polls // the backend in all cases, and polls the frontend only when there are // one or more workers ready. This is a neat way to use 0MQ's own queues // to hold messages we're not ready to process yet. When we get a client // reply, we pop the next available worker, and send the request to it, // including the originating client identity. When a worker replies, we // re-queue that worker, and we forward the reply to the original client, // using the reply envelope. // Queue of available workers available_workers := 0 var worker_queue []string = make([]string, 0) for { // Initialize poll set newPoller := zmq.NewPoller() newPoller.Add(backend, zmq.POLLIN) // Poll frontend only if we have available workers if available_workers > 0 { newPoller.Add(frontend, zmq.POLLIN) } // Handle worker activity on backend sockets, err := newPoller.Poll(-1) if err != nil { fmt.Println(err.Error()) } else { for _, socket := range sockets { switch s := socket.Socket; s { case frontend: // Now get next client request, route to last-used worker // Client request is [identity][empty][request] parts, _ := frontend.RecvMessage(0) client_id := parts[0] empty := parts[1] request := parts[2] backend.Send(worker_queue[0], zmq.SNDMORE) backend.Send(empty, zmq.SNDMORE) backend.Send(client_id, zmq.SNDMORE) backend.Send(empty, zmq.SNDMORE) backend.Send(request, 0) worker_queue = append(worker_queue[0:0], worker_queue[1:]...) available_workers-- case backend: parts, _ := backend.RecvMessage(0) // Queue worker identity for load-balancing worker_id := parts[0] worker_queue = append(worker_queue, worker_id) available_workers++ // Second frame is empty empty := parts[1] // Third frame is READY or else a client reply identity client_id := parts[2] // If client reply, send rest back to frontend if string(client_id) != "READY" { empty = parts[3] reply := parts[4] frontend.Send(client_id, zmq.SNDMORE) frontend.Send(empty, zmq.SNDMORE) frontend.Send(reply, 0) client_nbr-- if client_nbr == 0 { // Exit after N messages break } } } } } } }