基于 WSGI 协议的 Thrift RPC 方案

李飞 于 2015-11-30

背景

随着项目模块化不断深入,越来越多的模块被拆分到不同的 Git 库中。 模块之间会存在相互的依赖,譬如多个项目都需要读取 User 的信息等。

本地调用可以解决一部分问题,但即便是本地调用也需要依赖大量的外部资源, 如 Database、Redis 等,这些资源又会依赖模块的配置及初始化, 导致调用某些 lib 接口时,还需要关心或引入相应的外部资源配置及初始化。

远程调用可以将配置及初始化过程隔离起来,调用者不需要关系被调用资源的初始化过程, 只需要知道一个网址即可。

Thirft 是我们常用的 RPC 框架,有大量的优秀的特性,如支持多语言,接口明确,序列化,效率高等。 但其相关生态并不是非常友好。Thirft 仅提供了基础的 RPC 功能, 高级的组件如监控、负载均衡、任务调度等等都需要自己实现。

相比 Thrift,围绕 HTTP 协议构建的生态圈却非常完善,有大量的开源软件或组件可以使用, 如 Nginx, uWSGI, Gunicorn 等。这些开源软件或组件为 HTTP 服务提供了诸如负载均衡, 监控,任务调度,失败重试,超时『切腹』机制,ProxyCache,LocalCache,Spooler,自动重启,SSL 等大量高级特性。 基于这些特性,可以非常方便构建出远程调用服务。

本方案将二者结合起来,使得我们的服务同时具有 Thrift 的优点和 HTTP 的生态。

目标

  1. 使用 Thrift RPC 框架开发服务

  2. 享受到 HTTP 协议的生态圈

优点

  1. 采用 WSGI 协议,将 Thrift 的 Processor 包装为一个 HTTP 的服务。

    WSGI 是 Python 界实现 HTTP 协议的标准,各大框架均支持 WSIG 接口。Gunicorn、uWSGI 等都支持 WSGI 协议。

  2. 采用 DNS+ 负载均衡的策略替代服务发现。

    RPC 服务的一个常见的需求是自身的服务注册与服务发现,而 HTTP 服务却很少有提到这类的需求。 这是因为 DNS 本身就是一种服务发现形式,再加上 HTTP服务更强调无状态和负载均衡架构, 不依靠类似于 RPC 服务的 P2P 架构。

    对于本方案来说,调用一个 Thrift RPC 服务只需要知道一个网址即可。

  3. 使用 HTTP 的认证协议

    HTTP 有大量的认证协议可以使用,如:BasicAuth 等,而 Thrift 在这方便完全没有。

  4. 异步调用

    使用基于 Gevent 的 HTTPClient 可以将调用异步化,减少串行 IO 的等待时间; 而服务端的支持则由 Nginx 提供。

  5. HTTP2

    HTTP2 性能更高,功能更多;只需要一个支持 HTTP2 的 WSGI 容器即可实现集成 HTTP2 的功能。

  6. 部署

    随着其他 HTTP 服务一起部署即可。

  7. SSL

    若接口比较敏感,可采用 SSL 来加密调用过程;服务端的支持同样是由 Nginx 提供。

代码

整个代码非常简单,30 多行

from thrift.protocol import TBinaryProtocol
from thrift.transport.TTransport import TTransportBase
from werkzeug.wrappers import get_input_stream


class TIOStreamTransport(TTransportBase):
    """Creates a Thrift Transport from a stream-like object"""
    def __init__(self, input_stream, output_stream):
        self.input_stream = input_stream
        self.output_stream = output_stream

    def isOpen(self):
        return True

    def close(self):
        pass

    def read(self, sz):
        s = self.input_stream.read(sz)
        return s

    def write(self, buf):
        # Hack for werkzeug
        if isinstance(buf, bytearray):
            buf = bytes(buf)
        self.output_stream.append(buf)

    def flush(self):
        pass


def as_wsgi_app(processor):

    def wsgi_app(environ, start_response):
        method = environ.get('REQUEST_METHOD')
        if method != 'POST':
            start_response(b'405 METHOD NOT ALLOWED', [(b'Content-Type', b'text/plain')])
            return [b'Method Not Allowed']

        input_streams = get_input_stream(environ)
        output_streams = []
        transport = TIOStreamTransport(input_streams, output_streams)
        protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
        processor.process(protocol, protocol)
        start_response(b'200 OK', [(b'Content-Type', b'application/thrift')])
        return output_streams

    return wsgi_app
服务端代码
from werkzeug.wsgi import DispatcherMiddleware
from ratak.data.services import DataService, TDataService
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
    '/data/services/': as_wsgi_app(TDataService.Processor(DataService())),
})
调用端代码
transport = THttpClient('https://api.xxxxxx.com/data/services/')
protocol = TBinaryProtocol(transport)
client = DataService.Client(protocol)
transport.open()
try:
    client.ping()
finally:
    transport.close()

results matching ""

    No results matching ""