Home / PostsPost

使用go-kit写出优雅的gRPC微服务

嘟噜聪2020/03/29 11:45:56 [Golang] [go] [go-kit] [gRPC] [PRC] [微服务] 7733人已阅

简介 微服务在这几年一直是大家谈论的话题。微服务是一种软件架构,它将一个大且聚合的业务项目拆解为多个小且独立的业务模块,模块即服务,各服务间使用高效的协议(protobuf、JSON 等)相互调用即是

微服务在这几年一直是大家谈论的话题。微服务是一种软件架构,它将一个大且聚合的业务项目拆解为多个小且独立的业务模块,模块即服务,各服务间使用高效的协议(protobuf、JSON 等)相互调用即是 RPC。这种拆分代码库的方式有以下特点:

  • 每个服务应作为小规模的、独立的业务模块运行,独立部署
  • 每个服务应在进行自动化测试和(分布式)部署时,不影响其他服务
  • 每个服务内部进行细致的错误检查和处理

本章介绍如何通过GRPC的方式来开发微服务项目,同时也需要支持http的RESTful的方式。

简介

Github: https://github.com/icowan/grpc-world

设计一个简单的数据存储的服务,通过get获取数据,通过put设置数据,类似于Redis的key,value存储。

服务有两个API:

  • get: 根据key获取内容
  • put: 根据key设置内容

那么初步定义一个Service实现这两个功能:

type Service interface {
	Get(ctx context.Context, key string) (val string, err error)
	Put(ctx context.Context, key, val string) (err error)
}

以下内容全部通过go-kit的一些组件来实现。

开始前

在创建grpc服务之前需要在您的开发机器上安装proto命令,主要用它来通过.proto文件生成pb文件。

在MacOS安装

$ brew install autoconf automake libtool

因为是基于go语言除了proto之外还需要protoc-gen-go命令:

$ go get -u google.golang.org/grpc
$ cd $GOPATH/bin/
$ ls
protoc-gen-go

需要保证protoc-gen-goGOPATH/bin目录下

开始撸Server端代码

基于go-kit组件来实现server端,go-kit非常适合用来作微服务的组件,它有非常优秀的代码规范能大减少开发人员犯错概率,刚接触时可能会觉得非常复杂,但用久之后你会发现它真的很方便。用go-kit的好处还有,假如以后换成其他框架,在 Go-kit 良好的架构下,我们只需要把 Transport 以及 Endpoint 层剥离,留下 Service 就可以方便集成到新的框架下面了

go-kit主使用了三层结构:

  • Transport: 通信层,可以用各种不同的通信方式,如 HTTP RESTful 接口或者 gRPC 接口(这是个很大的优点,方便切换成任何通信协议)
    • http: http的传输处理
    • grpc: grpc传输处理
  • Endpoint: 终端层,主要实现各种接口的 handler,负责 req/resp 格式的转换
  • Service: 服务层,实现业务逻辑的地方

优雅的目录结构

.
├── Dockerfile
├── Makefile
├── README.md
├── client
│   ├── grpc
│   │   └── client.go
│   └── http
│       └── client.go
├── cmd
│   ├── main.go
│   └── service
│       └── service.go
├── docker-compose.yaml
├── go.mod
├── go.sum
└── pkg
    ├── encode
    │   └── response.go
    ├── endpoint
    │   └── endpoint.go
    ├── grpc
    │   ├── handler.go
    │   └── pb
    │       ├── service.pb.go
    │       └── service.proto
    ├── http
    │   └── handler.go
    ├── repository
    │   └── repository.go
    └── service
        ├── logging.go
        ├── middleware.go
        └── service.go
  • /client/: 我用来演示的demo,可以不用
    • /client/http/: http 的例子
    • /client/grpc/: grpc 的例子
  • /cmd: 存放通过命令行启动的入口
    • /cmd/service/: 真正入口在这里,在这里初始化服务
  • /pkg/: 所有的工具,及service都在这里
    • /pkg/encode/: encode 工具
    • /pkg/endpoint/: go-kit 的端点层
    • /pkg/gprc or http: 传输层,入参、出参都在这里进行处理
    • /pkg/repository/: 仓库,这里我只把数据存内存了没做持久化处理
    • /pkg/service/: 业务逻辑在这里实现

Service

Service是具体的业务实现,这里只需要关注业务逻辑不需要关注框架本身,需要什么就传入什么,也方便以后迁移。

Service定义一个 interface ,并提供所需要实现的方法,若将来有升级或兼容,可以再实现一个Service2,就不再需要修改上层逻辑了,也能向前兼容。

import (
	"context"
	"github.com/go-kit/kit/log"
	"github.com/icowan/grpc-world/pkg/repository"
)
type Service interface {
	Put(ctx context.Context, key, val string) (err error)
}
type service struct {
	logger     log.Logger
	repository repository.Repository
}
func (s *service) Put(ctx context.Context, key, val string) (err error) {
	return s.repository.Put(key, val)
}
func New(logger log.Logger, repository repository.Repository) Service {
	return &service{logger: logger, repository: repository}
}

Endpoint

端点的主要功能是将Transport传过来的Request内容进行类型转换并将数据传到Service及处理返回的内容或转换。简单来说它就是Transport根Service的桥梁。

package endpoint
import (
	"context"
	"github.com/go-kit/kit/endpoint"
	"github.com/icowan/grpc-world/pkg/encode"
	"github.com/icowan/grpc-world/pkg/service"
)
type GetRequest struct {
	Key string `json:"key"`
	Val string `json:"val"`
}
type Endpoints struct {
	GetEndpoint endpoint.Endpoint
}
func NewEndpoint(s service.Service, mdw map[string][]endpoint.Middleware) Endpoints {
	eps := Endpoints{
		GetEndpoint: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			req := request.(GetRequest)
			val, err := s.Get(ctx, req.Key)
			return encode.Response{
				Error: err,
				Data:  val,
			}, err
		},
	}
	for _, m := range mdw["Get"] {
		eps.GetEndpoint = m(eps.GetEndpoint)
	}
	return eps
}

Middleware

中间件的的功能主要用来记录日志、限流、分布式追踪、权限验证等等,每个api可以根据需求定制所需要的中间件。

Logging

logging除了这种写法外也可以类型endpoint.Middleware的那种写法,我这里展示两种模式。

import (
	"context"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/log/level"
	"time"
)
type loggingService struct {
	logger log.Logger
	next   Service
}
func NewLoggingService(logger log.Logger, s Service) Service {
	return &loggingService{level.Info(logger), s}
}
func (l *loggingService) Put(ctx context.Context, key, val string) (err error) {
	defer func(begin time.Time) {
		_ = l.logger.Log(
			"method", "Put",
			"key", key,
			"val", val,
			"took", time.Since(begin),
			"err", err,
		)
	}(time.Now())
	return l.next.Put(ctx, key, val)
}

Limiter

下面展示的是限流的中间件例子:

import (
	"context"
	"errors"
	"github.com/go-kit/kit/endpoint"
	"golang.org/x/time/rate"
)
func TokenBucketLimitter(bkt *rate.Limiter) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {
			if !bkt.Allow() {
				return nil, errors.New("Rate limit exceed!")
			}
			return next(ctx, request)
		}
	}
}

Repository

仓库只实现了一个简单的get及put功能,数据没有落地,进程停了数据就没了。

package repository
import (
	"errors"
	"sync"
	"time"
)
type Store struct {
	Key       string
	Val       string
	CreatedAt time.Time
}
type StoreKey string
var ErrUnknown = errors.New("unknown store")
type Repository interface {
	Put(key, val string) error
	Get(key string) (res *Store, err error)
}
type store struct {
	mtx    sync.RWMutex
	stores map[StoreKey]*Store
}

func (s *store) Put(key, val string) error {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	s.stores[StoreKey(key)] = &Store{
		Key:       key,
		Val:       val,
		CreatedAt: time.Now(),
	}
	return nil
}
// 省略一部分
func New() Repository {
	return &store{
		stores: make(map[StoreKey]*Store),
	}
}

Transport

传输层剥离出来之后应用的传输方式就可以随意写义了,可以是HTTP也可以是gRPC或其他方式,它们所对接的都是Endpoint,所以Endpoint及Service都不需要做任何调整就能直接多得传输方式。

HTTP

下面是实现HTTP Transport的实现方式:

import (
	"context"
	"encoding/json"
	kithttp "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	"github.com/icowan/grpc-world/pkg/encode"
	ep "github.com/icowan/grpc-world/pkg/endpoint"
	"github.com/pkg/errors"
	"net/http"
)
func MakeHTTPHandler(eps ep.Endpoints, opts ...kithttp.ServerOption) http.Handler {
	r := mux.NewRouter()
	r.Handle("/get/{key}", kithttp.NewServer(
		eps.GetEndpoint,
		decodeGetRequest,
		encode.JsonResponse,
		opts...,
	)).Methods(http.MethodGet)
	return r
}
func decodeGetRequest(_ context.Context, r *http.Request) (request interface{}, err error) {
	vars := mux.Vars(r)
	key, ok := vars["key"]
	if !ok {
		return nil, errors.New("route bad")
	}
	return ep.GetRequest{Key: key}, nil
}

gRPC

gRPC的Transport会比http稍微麻烦一丢丢,主要是gRPC还需要实现一个grpcServerinterface,除此之外与http的实现几乎差不多。

定义proto文件及生成pb文件

在实现grpcServer之前先得定义接口:

syntax = "proto3";
package pb;
service Service {
    rpc Get (GetRequest) returns (ServiceResponse) {}
    rpc Put (GetRequest) returns (ServiceResponse) {}
}
message GetRequest {
    string key = 1;
    string val = 2;
}
message ServiceResponse {
    bool success = 1;
    int64 code = 2;
    string data = 3;
    string err = 4;
}

生成pb文件:

进入pkg/grpc/pb/目录执行:

$ protoc service.proto --go_out==plugins=grpc:.
$ cd .. && tree 
.
├── handler.go
└── pb
    ├── service.pb.go
    └── service.proto
1 directory, 3 files

以下gRPC的实现代码参考:

import (
	"context"
	"github.com/go-kit/kit/transport/grpc"
	"github.com/icowan/grpc-world/pkg/encode"
	ep "github.com/icowan/grpc-world/pkg/endpoint"
	"github.com/icowan/grpc-world/pkg/grpc/pb"
)
type grpcServer struct {
	get grpc.Handler
}
func (g *grpcServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.ServiceResponse, error) {
	_, rep, err := g.get.ServeGRPC(ctx, req)
	if err != nil {
		return nil, err
	}
	return rep.(*pb.ServiceResponse), nil
}
func MakeGRPCHandler(eps ep.Endpoints, opts ...grpc.ServerOption) pb.ServiceServer {
	return &grpcServer{
		get: grpc.NewServer(
			eps.GetEndpoint,
			decodeGetRequest,
			encodeResponse,
			opts...,
		),
	}
}
func decodeGetRequest(_ context.Context, r interface{}) (interface{}, error) {
	return ep.GetRequest{
		Key: r.(*pb.GetRequest).Key,
		Val: r.(*pb.GetRequest).Val,
	}, nil
}
func encodeResponse(_ context.Context, r interface{}) (interface{}, error) {
	resp := r.(encode.Response)
	// ......省略
	return &pb.ServiceResponse{
		Success: resp.Success,
		Code:    int64(resp.Code),
	}, err
}

入口Run()

同时启动HTTP服务及gRPC服务需要启动两个端口,默认启动的是:8080:8081,通过传参可以自定义。

在Run里面需要初始化数据仓库Repository,初发化Service,初始化Endpoint和初始化Transport,初始化完成之后再启动相用两个传输方式。

最后有一个监听退出信号(signal)的功能,可根据需要自行处理。

代码路径: cmd/service/server.go

const rateBucketNum = 20
var (
	logger log.Logger
	fs       = flag.NewFlagSet("world", flag.ExitOnError)
	httpAddr = fs.String("http-addr", ":8080", "HTTP listen address")
	grpcAddr = fs.String("grpc-addr", ":8081", "gRPC listen address")
)
func Run() {
	if err := fs.Parse(os.Args[1:]); err != nil {
		panic(err)
	}
	logger = log.NewLogfmtLogger(os.Stderr)
	store := repository.New() // 初始化仓库
	svc := service.New(logger, store) // 
	svc = service.NewLoggingService(logger, svc)
	ems := []endpoint.Middleware{
		service.TokenBucketLimitter(rate.NewLimiter(rate.Every(time.Second*1), rateBucketNum)), // 限流
	}
	eps := ep.NewEndpoint(svc, map[string][]endpoint.Middleware{
		"Put": ems,
	})
	g := &group.Group{}
	initHttpHandler(eps, g)
	initGRPCHandler(eps, g)
	initCancelInterrupt(g)
	_ = level.Error(logger).Log("exit", g.Run())
}
func initCancelInterrupt(g *group.Group) {
	cancelInterrupt := make(chan struct{})
	g.Add(func() error {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		select {
		case sig := <-c:
			return fmt.Errorf("received signal %s", sig)
		case <-cancelInterrupt:
			return nil
		}
	}, func(error) {
		close(cancelInterrupt)
	})
}
func initHttpHandler(endpoints ep.Endpoints, g *group.Group) {
	opts := []kithttp.ServerOption{
		kithttp.ServerErrorHandler(transport.NewLogErrorHandler(level.Error(logger))),
		kithttp.ServerErrorEncoder(encode.JsonError),
	}
	httpHandler := http.MakeHTTPHandler(endpoints, opts...)
	httpListener, err := net.Listen("tcp", *httpAddr)
	g.Add(func() error {
		return netHttp.Serve(httpListener, httpHandler)
	}, func(error) {// 略...})

}
func initGRPCHandler(endpoints ep.Endpoints, g *group.Group) {
	grpcOpts := []kitgrpc.ServerOption{
		kitgrpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
	}
	grpcListener, err := net.Listen("tcp", *grpcAddr)
	g.Add(func() error {
		baseServer := googleGrpc.NewServer()
		pb.RegisterServiceServer(baseServer, grpc.MakeGRPCHandler(endpoints, grpcOpts...))
		return baseServer.Serve(grpcListener)
	}, func(error) {// 略...)
}

GRPC Client

http 的测试就不贴上来了,通过浏览器直接访问或 curl都可以,重定如何通过grpc调用server端:

import (
	"context"
	"github.com/icowan/grpc-world/pkg/grpc/pb"
	"google.golang.org/grpc"
	"log"
	"time"
)
func main() {
	conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer func() {
		_ = conn.Close()
	}()
	svc := pb.NewServiceClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := svc.Put(ctx, &pb.GetRequest{
		Key: "hello",
		Val: "world",
	})
	if err != nil {
		log.Fatalf("could not put: %v", err)
	}
	log.Println(r.GetSuccess())
}

测试

$ make run
GO111MODULE=on /usr/local/go/bin/go run ./cmd/main.go -http-addr :8080 -grpc-addr :8081
level=error ts=2020-03-28T10:45:05.923565Z caller=service.go:106 transport=HTTP addr=:8080

执行客户端测试命令:

$ go run ./client/grpc/client.go
$ go run ./client/http/client.go
level=info ts=2020-03-28T10:45:44.793353Z caller=logging.go:41 method=Put key=hello val=world took=2.142µs err=null
level=info ts=2020-03-28T10:45:44.794983Z caller=logging.go:28 method=Get key=hello val=world took=1.248µs err=null
level=info ts=2020-03-28T10:47:02.666247Z caller=logging.go:28 method=Get key=hello val=world took=1.396µs err=null

尾巴

本章所用的测试代码已经更新到了Github上,如果您觉得有参考价值的,可以将代码clone 下来,最好能给个star。

Github: https://github.com/icowan/grpc-world

谢谢了

如果我写的内容对您有用,谢谢大家了

很赞哦! (14)

文章评论

点击排行

本栏推荐

站点信息

  • 微信公众号