Home / PostsPost
使用go-kit写出优雅的gRPC微服务
嘟噜聪2020/03/29 11:45:56
[Golang]
[go]
[go-kit]
[gRPC]
[PRC]
[微服务]
7733人已阅
简介 微服务在这几年一直是大家谈论的话题。微服务是一种软件架构,它将一个大且聚合的业务项目拆解为多个小且独立的业务模块,模块即服务,各服务间使用高效的协议(protobuf、JSON 等)相互调用即是
微服务在这几年一直是大家谈论的话题。微服务是一种软件架构,它将一个大且聚合的业务项目拆解为多个小且独立的业务模块,模块即服务,各服务间使用高效的协议(protobuf、JSON 等)相互调用即是 RPC。这种拆分代码库的方式有以下特点:
- 每个服务应作为小规模的、独立的业务模块运行,独立部署
- 每个服务应在进行自动化测试和(分布式)部署时,不影响其他服务
- 每个服务内部进行细致的错误检查和处理
本章介绍如何通过GRPC的方式来开发微服务项目,同时也需要支持http的RESTful的方式。
简介
Github: https://github.com/icowan/grpc-world
设计一个简单的数据存储的服务,通过get获取数据,通过put设置数据,类似于Redis的key,value存储。
服务有两个API:
get
: 根据key获取内容put
: 根据key设置内容
那么初步定义一个Service实现这两个功能:
type Service interface {
Get(ctx context.Context, key string) (val string, err error)
Put(ctx context.Context, key, val string) (err error)
}
以下内容全部通过go-kit的一些组件来实现。
开始前
在创建grpc服务之前需要在您的开发机器上安装proto命令,主要用它来通过.proto文件生成pb文件。
在MacOS安装
$ brew install autoconf automake libtool
因为是基于go语言除了proto之外还需要protoc-gen-go命令:
$ go get -u google.golang.org/grpc
$ cd $GOPATH/bin/
$ ls
protoc-gen-go
需要保证protoc-gen-go
在GOPATH/bin
目录下
开始撸Server端代码
基于go-kit组件来实现server端,go-kit非常适合用来作微服务的组件,它有非常优秀的代码规范能大减少开发人员犯错概率,刚接触时可能会觉得非常复杂,但用久之后你会发现它真的很方便。用go-kit的好处还有,假如以后换成其他框架,在 Go-kit 良好的架构下,我们只需要把 Transport 以及 Endpoint 层剥离,留下 Service 就可以方便集成到新的框架下面了
go-kit主使用了三层结构:
- Transport: 通信层,可以用各种不同的通信方式,如 HTTP RESTful 接口或者 gRPC 接口(这是个很大的优点,方便切换成任何通信协议)
http
: http的传输处理grpc
: grpc传输处理
- Endpoint: 终端层,主要实现各种接口的 handler,负责 req/resp 格式的转换
- Service: 服务层,实现业务逻辑的地方
优雅的目录结构
.
├── Dockerfile
├── Makefile
├── README.md
├── client
│ ├── grpc
│ │ └── client.go
│ └── http
│ └── client.go
├── cmd
│ ├── main.go
│ └── service
│ └── service.go
├── docker-compose.yaml
├── go.mod
├── go.sum
└── pkg
├── encode
│ └── response.go
├── endpoint
│ └── endpoint.go
├── grpc
│ ├── handler.go
│ └── pb
│ ├── service.pb.go
│ └── service.proto
├── http
│ └── handler.go
├── repository
│ └── repository.go
└── service
├── logging.go
├── middleware.go
└── service.go
/client/
: 我用来演示的demo,可以不用/client/http/
: http 的例子/client/grpc/
: grpc 的例子
/cmd
: 存放通过命令行启动的入口/cmd/service/
: 真正入口在这里,在这里初始化服务
/pkg/
: 所有的工具,及service都在这里/pkg/encode/
: encode 工具/pkg/endpoint/
: go-kit 的端点层/pkg/gprc or http
: 传输层,入参、出参都在这里进行处理/pkg/repository/
: 仓库,这里我只把数据存内存了没做持久化处理/pkg/service/
: 业务逻辑在这里实现
Service
Service是具体的业务实现,这里只需要关注业务逻辑不需要关注框架本身,需要什么就传入什么,也方便以后迁移。
Service定义一个 interface ,并提供所需要实现的方法,若将来有升级或兼容,可以再实现一个Service2,就不再需要修改上层逻辑了,也能向前兼容。
import (
"context"
"github.com/go-kit/kit/log"
"github.com/icowan/grpc-world/pkg/repository"
)
type Service interface {
Put(ctx context.Context, key, val string) (err error)
}
type service struct {
logger log.Logger
repository repository.Repository
}
func (s *service) Put(ctx context.Context, key, val string) (err error) {
return s.repository.Put(key, val)
}
func New(logger log.Logger, repository repository.Repository) Service {
return &service{logger: logger, repository: repository}
}
Endpoint
端点的主要功能是将Transport传过来的Request内容进行类型转换并将数据传到Service及处理返回的内容或转换。简单来说它就是Transport根Service的桥梁。
package endpoint
import (
"context"
"github.com/go-kit/kit/endpoint"
"github.com/icowan/grpc-world/pkg/encode"
"github.com/icowan/grpc-world/pkg/service"
)
type GetRequest struct {
Key string `json:"key"`
Val string `json:"val"`
}
type Endpoints struct {
GetEndpoint endpoint.Endpoint
}
func NewEndpoint(s service.Service, mdw map[string][]endpoint.Middleware) Endpoints {
eps := Endpoints{
GetEndpoint: func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(GetRequest)
val, err := s.Get(ctx, req.Key)
return encode.Response{
Error: err,
Data: val,
}, err
},
}
for _, m := range mdw["Get"] {
eps.GetEndpoint = m(eps.GetEndpoint)
}
return eps
}
Middleware
中间件的的功能主要用来记录日志、限流、分布式追踪、权限验证等等,每个api可以根据需求定制所需要的中间件。
Logging
logging除了这种写法外也可以类型endpoint.Middleware
的那种写法,我这里展示两种模式。
import (
"context"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"time"
)
type loggingService struct {
logger log.Logger
next Service
}
func NewLoggingService(logger log.Logger, s Service) Service {
return &loggingService{level.Info(logger), s}
}
func (l *loggingService) Put(ctx context.Context, key, val string) (err error) {
defer func(begin time.Time) {
_ = l.logger.Log(
"method", "Put",
"key", key,
"val", val,
"took", time.Since(begin),
"err", err,
)
}(time.Now())
return l.next.Put(ctx, key, val)
}
Limiter
下面展示的是限流的中间件例子:
import (
"context"
"errors"
"github.com/go-kit/kit/endpoint"
"golang.org/x/time/rate"
)
func TokenBucketLimitter(bkt *rate.Limiter) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
if !bkt.Allow() {
return nil, errors.New("Rate limit exceed!")
}
return next(ctx, request)
}
}
}
Repository
仓库只实现了一个简单的get及put功能,数据没有落地,进程停了数据就没了。
package repository
import (
"errors"
"sync"
"time"
)
type Store struct {
Key string
Val string
CreatedAt time.Time
}
type StoreKey string
var ErrUnknown = errors.New("unknown store")
type Repository interface {
Put(key, val string) error
Get(key string) (res *Store, err error)
}
type store struct {
mtx sync.RWMutex
stores map[StoreKey]*Store
}
func (s *store) Put(key, val string) error {
s.mtx.Lock()
defer s.mtx.Unlock()
s.stores[StoreKey(key)] = &Store{
Key: key,
Val: val,
CreatedAt: time.Now(),
}
return nil
}
// 省略一部分
func New() Repository {
return &store{
stores: make(map[StoreKey]*Store),
}
}
Transport
传输层剥离出来之后应用的传输方式就可以随意写义了,可以是HTTP也可以是gRPC或其他方式,它们所对接的都是Endpoint,所以Endpoint及Service都不需要做任何调整就能直接多得传输方式。
HTTP
下面是实现HTTP Transport的实现方式:
import (
"context"
"encoding/json"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
"github.com/icowan/grpc-world/pkg/encode"
ep "github.com/icowan/grpc-world/pkg/endpoint"
"github.com/pkg/errors"
"net/http"
)
func MakeHTTPHandler(eps ep.Endpoints, opts ...kithttp.ServerOption) http.Handler {
r := mux.NewRouter()
r.Handle("/get/{key}", kithttp.NewServer(
eps.GetEndpoint,
decodeGetRequest,
encode.JsonResponse,
opts...,
)).Methods(http.MethodGet)
return r
}
func decodeGetRequest(_ context.Context, r *http.Request) (request interface{}, err error) {
vars := mux.Vars(r)
key, ok := vars["key"]
if !ok {
return nil, errors.New("route bad")
}
return ep.GetRequest{Key: key}, nil
}
gRPC
gRPC的Transport会比http稍微麻烦一丢丢,主要是gRPC还需要实现一个grpcServer
的 interface
,除此之外与http的实现几乎差不多。
定义proto文件及生成pb文件
在实现grpcServer之前先得定义接口:
syntax = "proto3";
package pb;
service Service {
rpc Get (GetRequest) returns (ServiceResponse) {}
rpc Put (GetRequest) returns (ServiceResponse) {}
}
message GetRequest {
string key = 1;
string val = 2;
}
message ServiceResponse {
bool success = 1;
int64 code = 2;
string data = 3;
string err = 4;
}
生成pb文件:
进入pkg/grpc/pb/
目录执行:
$ protoc service.proto --go_out==plugins=grpc:.
$ cd .. && tree
.
├── handler.go
└── pb
├── service.pb.go
└── service.proto
1 directory, 3 files
以下gRPC的实现代码参考:
import (
"context"
"github.com/go-kit/kit/transport/grpc"
"github.com/icowan/grpc-world/pkg/encode"
ep "github.com/icowan/grpc-world/pkg/endpoint"
"github.com/icowan/grpc-world/pkg/grpc/pb"
)
type grpcServer struct {
get grpc.Handler
}
func (g *grpcServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.ServiceResponse, error) {
_, rep, err := g.get.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return rep.(*pb.ServiceResponse), nil
}
func MakeGRPCHandler(eps ep.Endpoints, opts ...grpc.ServerOption) pb.ServiceServer {
return &grpcServer{
get: grpc.NewServer(
eps.GetEndpoint,
decodeGetRequest,
encodeResponse,
opts...,
),
}
}
func decodeGetRequest(_ context.Context, r interface{}) (interface{}, error) {
return ep.GetRequest{
Key: r.(*pb.GetRequest).Key,
Val: r.(*pb.GetRequest).Val,
}, nil
}
func encodeResponse(_ context.Context, r interface{}) (interface{}, error) {
resp := r.(encode.Response)
// ......省略
return &pb.ServiceResponse{
Success: resp.Success,
Code: int64(resp.Code),
}, err
}
入口Run()
同时启动HTTP服务及gRPC服务需要启动两个端口,默认启动的是:8080
和:8081
,通过传参可以自定义。
在Run里面需要初始化数据仓库Repository
,初发化Service
,初始化Endpoint
和初始化Transport
,初始化完成之后再启动相用两个传输方式。
最后有一个监听退出信号(signal
)的功能,可根据需要自行处理。
代码路径: cmd/service/server.go
const rateBucketNum = 20
var (
logger log.Logger
fs = flag.NewFlagSet("world", flag.ExitOnError)
httpAddr = fs.String("http-addr", ":8080", "HTTP listen address")
grpcAddr = fs.String("grpc-addr", ":8081", "gRPC listen address")
)
func Run() {
if err := fs.Parse(os.Args[1:]); err != nil {
panic(err)
}
logger = log.NewLogfmtLogger(os.Stderr)
store := repository.New() // 初始化仓库
svc := service.New(logger, store) //
svc = service.NewLoggingService(logger, svc)
ems := []endpoint.Middleware{
service.TokenBucketLimitter(rate.NewLimiter(rate.Every(time.Second*1), rateBucketNum)), // 限流
}
eps := ep.NewEndpoint(svc, map[string][]endpoint.Middleware{
"Put": ems,
})
g := &group.Group{}
initHttpHandler(eps, g)
initGRPCHandler(eps, g)
initCancelInterrupt(g)
_ = level.Error(logger).Log("exit", g.Run())
}
func initCancelInterrupt(g *group.Group) {
cancelInterrupt := make(chan struct{})
g.Add(func() error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-c:
return fmt.Errorf("received signal %s", sig)
case <-cancelInterrupt:
return nil
}
}, func(error) {
close(cancelInterrupt)
})
}
func initHttpHandler(endpoints ep.Endpoints, g *group.Group) {
opts := []kithttp.ServerOption{
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(level.Error(logger))),
kithttp.ServerErrorEncoder(encode.JsonError),
}
httpHandler := http.MakeHTTPHandler(endpoints, opts...)
httpListener, err := net.Listen("tcp", *httpAddr)
g.Add(func() error {
return netHttp.Serve(httpListener, httpHandler)
}, func(error) {// 略...})
}
func initGRPCHandler(endpoints ep.Endpoints, g *group.Group) {
grpcOpts := []kitgrpc.ServerOption{
kitgrpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
}
grpcListener, err := net.Listen("tcp", *grpcAddr)
g.Add(func() error {
baseServer := googleGrpc.NewServer()
pb.RegisterServiceServer(baseServer, grpc.MakeGRPCHandler(endpoints, grpcOpts...))
return baseServer.Serve(grpcListener)
}, func(error) {// 略...)
}
GRPC Client
http 的测试就不贴上来了,通过浏览器直接访问或 curl都可以,重定如何通过grpc调用server端:
import (
"context"
"github.com/icowan/grpc-world/pkg/grpc/pb"
"google.golang.org/grpc"
"log"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() {
_ = conn.Close()
}()
svc := pb.NewServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := svc.Put(ctx, &pb.GetRequest{
Key: "hello",
Val: "world",
})
if err != nil {
log.Fatalf("could not put: %v", err)
}
log.Println(r.GetSuccess())
}
测试
$ make run
GO111MODULE=on /usr/local/go/bin/go run ./cmd/main.go -http-addr :8080 -grpc-addr :8081
level=error ts=2020-03-28T10:45:05.923565Z caller=service.go:106 transport=HTTP addr=:8080
执行客户端测试命令:
$ go run ./client/grpc/client.go
$ go run ./client/http/client.go
level=info ts=2020-03-28T10:45:44.793353Z caller=logging.go:41 method=Put key=hello val=world took=2.142µs err=null
level=info ts=2020-03-28T10:45:44.794983Z caller=logging.go:28 method=Get key=hello val=world took=1.248µs err=null
level=info ts=2020-03-28T10:47:02.666247Z caller=logging.go:28 method=Get key=hello val=world took=1.396µs err=null
尾巴
本章所用的测试代码已经更新到了Github上,如果您觉得有参考价值的,可以将代码clone 下来,最好能给个star。
Github: https://github.com/icowan/grpc-world
谢谢了
如果我写的内容对您有用,谢谢大家了
很赞哦! (14)