首頁 > 易卦

Go 每日一庫之 rpc:這是標準庫提供的

作者:由 Go中國 發表于 易卦日期:2023-02-01

rpc定位器是什麼

簡介

RPC(Remote Procedure Call)是遠端方法呼叫的縮寫,它可以透過網路呼叫遠端物件的方法。Go 標準庫net/rpc提供了一個

簡單、強大且高效能

的 RPC 實現。僅需編寫很少的程式碼就能實現 RPC 服務。本文就來介紹一下這個庫。

快速使用

標準庫無需安裝。

由於是網路程式,我們需要編寫服務端和客戶端兩個程式。首先是服務端程式:

package mainimport ( “errors” “log” “net” “net/http” “net/rpc”)type Args struct { A, B int}type Quotient struct { Quo, Rem int}type Arith intfunc (t *Arith) Multiply(args *Args, reply *int) error { *reply = args。A * args。B return nil}func (t *Arith) Divide(args *Args, quo *Quotient) error { if args。B == 0 { return errors。New(“divide by 0”) } quo。Quo = args。A / args。B quo。Rem = args。A % args。B return nil}func main() { arith := new(Arith) rpc。Register(arith) rpc。HandleHTTP() if err := http。ListenAndServe(“:1234”, nil); err != nil { log。Fatal(“serve error:”, err) }}

我們定義了一個Arith型別,為它編寫了兩個方法Multiply和Divide。建立Arith型別的物件arith,呼叫rpc。Register(arith)會註冊這兩個方法。rpc庫對註冊的方法有一定的限制,方法必須滿足簽名func (t *T) MethodName(argType T1, replyType *T2) error:

首先,方法必須是匯出的(名字首字母大寫);

其次,方法接受兩個引數,必須是匯出的或內建型別。第一個引數表示客戶端傳遞過來的請求引數,第二個是需要返回給客戶端的響應。第二個引數必須為指標型別(需要修改);

最後,方法必須返回一個error型別的值。返回非nil的值,表示調用出錯。

rpc。HandleHTTP()註冊 HTTP 路由。http。ListenAndServe(“:1234”, nil)在埠1234上啟動一個 HTTP 服務,請求 rpc 方法會交給rpc內部路由處理。這樣我們就可以透過客戶端呼叫這兩個方法了:

package mainimport ( “fmt” “log” “net/rpc”)type Args struct { A, B int}type Quotient struct { Quo, Rem int}func main() { client, err := rpc。DialHTTP(“tcp”, “:1234”) if err != nil { log。Fatal(“dialing:”, err) } args := &Args{7, 8} var reply int err = client。Call(“Arith。Multiply”, args, &reply) if err != nil { log。Fatal(“Multiply error:”, err) } fmt。Printf(“Multiply: %d*%d=%d\n”, args。A, args。B, reply) args = &Args{15, 6} var quo Quotient err = client。Call(“Arith。Divide”, args, &quo) if err != nil { log。Fatal(“Divide error:”, err) } fmt。Printf(“Divide: %d/%d=%d。。。%d\n”, args。A, args。B, quo。Quo, quo。Rem)}

客戶端比服務端稍微簡單一點,我們使用rpc。DialHTTP(“tcp”, “:1234”)連線到服務端的監聽地址,返回一個 rpc 的客戶端物件。後續就可以呼叫該物件的Call()方法呼叫服務端物件的對應方法,依次傳入方法名(需要加上型別限定)、引數、一個指標(用於接收返回值)。首先執行服務端程式:

$ go run main。go

然後在一個新的控制檯中執行客戶端程式,輸出:

$ go run client。goMultiply: 7*8=56Divide: 15/6=2。。。3

對net/http包不熟悉的童鞋可能會覺得奇怪,rpc。HandleHTTP()與http。ListenAndServer(“:1234”, nil)是怎麼聯絡起來的?我們簡單看一下原始碼:

// src/net/rpc/server。goconst ( // Defaults used by HandleHTTP DefaultRPCPath = “/_goRPC_” DefaultDebugPath = “/debug/rpc”)func (server *Server) HandleHTTP(rpcPath, debugPath string) { http。Handle(rpcPath, server) http。Handle(debugPath, debugHTTP{server})}func HandleHTTP() { DefaultServer。HandleHTTP(DefaultRPCPath, DefaultDebugPath)}

實際上,rpc。HandleHTTP()會呼叫http。Handle()在預定義的路徑上(/_goRPC_)註冊處理器。這個處理器最終被新增到net/http包中的預設多路複用器上:

// src/net/http/server。gofunc Handle(pattern string, handler Handler) { DefaultServeMux。Handle(pattern, handler)}

而http。ListenAndServer()第二個引數傳入nil時也是使用預設的多路複用器。具體可以看看我之前的文章Go Web 程式設計之 程式結構。

細心的朋友可能發現了,除了預設的路徑/_goRPC_用來處理 RPC 請求,rpc。HandleHTTP()方法還註冊了一個除錯路徑/debug/rpc。我們可以直接在瀏覽器中訪問這個網址(需要服務端程式開啟。如果服務端在遠端,需要相應地修改地址)localhost:1234,直觀的檢視各個方法的呼叫情況:

Go 每日一庫之 rpc:這是標準庫提供的

非同步呼叫

上面的例子中,我們在客戶端使用了同步的呼叫方式,即一直等待服務端的響應或出錯。在等待的過程中,客戶端就不能處理其它的任務了。當然,我們也可以採用非同步的呼叫方式:

func main() { client, err := rpc。DialHTTP(“tcp”, “:1234”) if err != nil { log。Fatal(“dialing:”, err) } args1 := &Args{7, 8} var reply int multiplyReply := client。Go(“Arith。Multiply”, args1, &reply, nil) args2 := &Args{15, 6} var quo Quotient divideReply := client。Go(“Arith。Divide”, args2, &quo, nil) ticker := time。NewTicker(time。Millisecond) defer ticker。Stop() var multiplyReplied, divideReplied bool for !multiplyReplied || !divideReplied { select { case replyCall := <-multiplyReply。Done: if err := replyCall。Error; err != nil { fmt。Println(“Multiply error:”, err) } else { fmt。Printf(“Multiply: %d*%d=%d\n”, args1。A, args1。B, reply) } multiplyReplied = true case replyCall := <-divideReply。Done: if err := replyCall。Error; err != nil { fmt。Println(“Divide error:”, err) } else { fmt。Printf(“Divide: %d/%d=%d。。。%d\n”, args2。A, args2。B, quo。Quo, quo。Rem) } divideReplied = true case <-ticker。C: fmt。Println(“tick”) } }}

非同步呼叫使用client。Go()方法,引數與同步呼叫基本一樣。它返回一個rpc。Call物件:

// src/net/rpc/client。gotype Call struct { ServiceMethod string Args interface{} Reply interface{} Error error Done chan *Call }

我們可以透過該物件獲取此次呼叫的資訊,如方法名、引數、返回值和錯誤。我們透過監聽通道Done是否有值判斷呼叫是否完成。上面程式碼中使用一個select語句輪詢兩次呼叫的狀態。注意一點,

如果多個通道都有值,select執行哪個case是隨機的

。所以可能先輸出divide的資訊:

$ go run client。go Divide: 15/6=2。。。3Multiply: 7*8=56

服務端可以繼續使用一開始的。

定製方法名

預設情況下,rpc。Register()將方法接收者(receiver)的型別名作為方法名字首。我們也可以自己設定。這時需要呼叫RegisterName(name string, rcvr interface{}) error方法:

func main() { arith := new(Arith) rpc。RegisterName(“math”, arith) rpc。HandleHTTP() if err := http。ListenAndServe(“:1234”, nil); err != nil { log。Fatal(“serve error:”, err) }}

上面我們將註冊的方法名字首改為math了,客戶端呼叫時傳入的方法名也需要相應的修改:

func main() { client, err := rpc。DialHTTP(“tcp”, “:1234”) if err != nil { log。Fatal(“dialing:”, err) } args := &Args{7, 8} var reply int err = client。Call(“math。Multiply”, args, &reply) if err != nil { log。Fatal(“Multiply error:”, err) } fmt。Printf(“Multiply: %d*%d=%d\n”, args。A, args。B, reply)}TCP

上面我們都是使用 HTTP 協議來實現 rpc 服務的,rpc庫也支援直接使用 TCP 協議。首先,服務端先呼叫net。Listen(“tcp”, “:1234”)建立一個監聽某個 TCP 埠的監聽器(Accepter),然後使用rpc。Accept(l)在此監聽器上接受連線並處理:

func main() { l, err := net。Listen(“tcp”, “:1234”) if err != nil { log。Fatal(“listen error:”, err) } arith := new(Arith) rpc。Register(arith) rpc。Accept(l)}

然後,客戶端呼叫rpc。Dial()以 TCP 協議連線到服務端:

func main() { client, err := rpc。Dial(“tcp”, “:1234”) if err != nil { log。Fatal(“dialing:”, err) } args := &Args{7, 8} var reply int err = client。Call(“Arith。Multiply”, args, &reply) if err != nil { log。Fatal(“Multiply error:”, err) } fmt。Printf(“Multiply: %d*%d=%d\n”, args。A, args。B, reply)}自己接收連線

我們可以自己接受連線,然後在此連線上應用 rpc 協議:

func main() { l, err := net。Listen(“tcp”, “:1234”) if err != nil { log。Fatal(“listen error:”, err) } arith := new(Arith) rpc。Register(arith) for { conn, err := l。Accept() if err != nil { log。Fatal(“accept error:”, err) } go rpc。ServeConn(conn) }}

這個客戶端與上面 TCP 的客戶端一樣,不用修改。

自定義編碼格式

預設客戶端與服務端之間的資料使用gob編碼,我們可以使用其它的格式來編碼。在服務端,我們要實現rpc。ServerCodec介面:

// src/net/rpc/server。gotype ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(interface{}) error WriteResponse(*Response, interface{}) error Close() error}

實際上不用這麼麻煩,我們檢視原始碼看看gobServerCodec是怎麼實現的,然後仿造實現一個就行了。下面我實現了一個 JSON 格式的編解碼器:

type JsonServerCodec struct { rwc io。ReadWriteCloser dec *json。Decoder enc *json。Encoder encBuf *bufio。Writer closed bool}func NewJsonServerCodec(conn io。ReadWriteCloser) *JsonServerCodec { buf := bufio。NewWriter(conn) return &JsonServerCodec{conn, json。NewDecoder(conn), json。NewEncoder(buf), buf, false}}func (c *JsonServerCodec) ReadRequestHeader(r *rpc。Request) error { return c。dec。Decode(r)}func (c *JsonServerCodec) ReadRequestBody(body interface{}) error { return c。dec。Decode(body)}func (c *JsonServerCodec) WriteResponse(r *rpc。Response, body interface{}) (err error) { if err = c。enc。Encode(r); err != nil { if c。encBuf。Flush() == nil { log。Println(“rpc: json error encoding response:”, err) c。Close() } return } if err = c。enc。Encode(body); err != nil { if c。encBuf。Flush() == nil { log。Println(“rpc: json error encoding body:”, err) c。Close() } return } return c。encBuf。Flush()}func (c *JsonServerCodec) Close() error { if c。closed { return nil } c。closed = true return c。rwc。Close()}func main() { l, err := net。Listen(“tcp”, “:1234”) if err != nil { log。Fatal(“listen error:”, err) } arith := new(Arith) rpc。Register(arith) for { conn, err := l。Accept() if err != nil { log。Fatal(“accept error:”, err) } go rpc。ServeCodec(NewJsonServerCodec(conn)) }}

在for迴圈中需要建立編解碼器JsonServerCodec傳給ServeCodec方法。同樣的,客戶端要實現rpc。ClientCodec介面,也是仿造gobClientCodec的實現:

type JsonClientCodec struct { rwc io。ReadWriteCloser dec *json。Decoder enc *json。Encoder encBuf *bufio。Writer}func NewJsonClientCodec(conn io。ReadWriteCloser) *JsonClientCodec { encBuf := bufio。NewWriter(conn) return &JsonClientCodec{conn, json。NewDecoder(conn), json。NewEncoder(encBuf), encBuf}}func (c *JsonClientCodec) WriteRequest(r *rpc。Request, body interface{}) (err error) { if err = c。enc。Encode(r); err != nil { return } if err = c。enc。Encode(body); err != nil { return } return c。encBuf。Flush()}func (c *JsonClientCodec) ReadResponseHeader(r *rpc。Response) error { return c。dec。Decode(r)}func (c *JsonClientCodec) ReadResponseBody(body interface{}) error { return c。dec。Decode(body)}func (c *JsonClientCodec) Close() error { return c。rwc。Close()}func main() { conn, err := net。Dial(“tcp”, “:1234”) if err != nil { log。Fatal(“dial error:”, err) } client := rpc。NewClientWithCodec(NewJsonClientCodec(conn)) args := &Args{7, 8} var reply int err = client。Call(“Arith。Multiply”, args, &reply) if err != nil { log。Fatal(“Multiply error:”, err) } fmt。Printf(“Multiply: %d*%d=%d\n”, args。A, args。B, reply)}

要使用NewClientWithCodec以指定的編解碼器建立客戶端。

自定義伺服器

實際上,上面我們呼叫的方法rpc。Register,rpc。RegisterName,rpc。ServeConn,rpc。ServeCodec都是轉而去呼叫預設DefaultServer的相關方法:

// src/net/rpc/server。govar DefaultServer = NewServer()func Register(rcvr interface{}) error { return DefaultServer。Register(rcvr) }func RegisterName(name string, rcvr interface{}) error { return DefaultServer。RegisterName(name, rcvr)}func ServeConn(conn io。ReadWriteCloser) { DefaultServer。ServeConn(conn)}func ServeCodec(codec ServerCodec) { DefaultServer。ServeCodec(codec)}

但是因為DefaultServer是全域性共享的,如果有第三方庫使用了相關方法,並且註冊了一些物件的方法,我們引用這個第三方庫之後,就出現兩個問題。第一,可能與我們註冊的方法衝突;第二,帶來額外的安全隱患(庫中方法直接panic?)。故而推薦做法是自己NewServer:

func main() { arith := new(Arith) server := rpc。NewServer() server。RegisterName(“math”, arith) server。HandleHTTP(rpc。DefaultRPCPath, rpc。DefaultDebugPath) if err := http。ListenAndServe(“:1234”, nil); err != nil { log。Fatal(“serve error:”, err) }}

這其實是一個套路,很多庫會提供一個預設的實現直接使用,如log、net/http這些庫。但是也提供了建立和自定義的方法。一般測試時為了方便可以使用預設實現,實踐中最好自己建立相應的物件,避免干擾和安全問題。

總結

本文介紹了 Go 標準庫中的rpc,它使用非常簡單,效能異常強大。很多rpc的第三方庫都是對rpc的封裝,早期版本的rpcx就是基於rpc做的封裝。

大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue

參考

rpc 官方文件https://golang。org/pkg/net/rpc/

Go 每日一庫 GitHub:https://github。com/darjun/go-daily-lib