通过 zookeeper、redis 实现分布式服务治理

2019-01-01 20:20:54   最后更新: 2019-01-01 20:20:54   访问数量:163




微服务已经成为当下最为流行的分布式架构了

通过将系统拆分成若干个服务,将业务进行横向、纵向切分,而诞生出各个高度内聚、轻度耦合的微服务,组成微服务架构

微服务架构在其可维护性、责任分工上都有着很大的优势,更加有利于系统的组建、维护、问题的快速响应和解决

但是,微服务架构也存在着难以治理的缺点,由于服务数量众多,每个服务又有多台服务器提供服务,如何实时监控每台服务器的运行健康情况,如何实现服务的平滑切换与扩容,都是微服务架构组建之前需要首要考虑的

目前基于 java 的 RPC 架构包括阿里开源的 dubbo、sofa,腾讯开源的 TARS,新浪微博开源的 montan 等都具备比较强大的服务注册、发现、水平扩容等诸多功能的服务治理,但 python 目前尚没有很好的开源解决方案

本文,我们就基于 thrift、zookeeper、redis 来实现一套基于 python 的基本的服务治理方案,我们主要解决的问题有:

  1. 服务注册、发现
  2. 服务端负载均衡
  3. 服务提供者动态权重调整
  4. 服务手动停止、开启
  5. 服务调用监控
  6. python thrift 的使用简化

 

在微服务环境下,服务间通信可以采用最基本的 Restful http 协议通信,但这样有以下几个问题:

  1. 负载均衡主要依靠 nginx 等方式,难以监控和控制
  2. 难以实现服务节点的手动开关
  3. 服务间依赖关系不清晰
  4. 开发维护强依赖文档
  5. 接口规范缺乏保障和约束
  6. 使用者自行维护编码、传输、解码过程

 

综合上面几个问题,RPC 诞生了,目前跨语言、综合性能优势较大的成熟 RPC 系统主要有 gRPC 和 thrift,两者分别是 Google 和 Apache 的开源产品

在简单地使用 thrift、gRPC 搭建微服务架构时,最简单的治理方法是利用 RMI 或 Hessian 等工具,简单地暴露和引用远程服务,但这仍然具有上述服务间依赖关系不清晰、难以实现服务节点的手动开关等问题,因此服务治理平台的搭建是十分必要的

 

服务治理平台作为服务提供方与服务调用方之间的一道桥梁,需要至少具备以下几个重要因素:

  1. 服务注册管理
  2. 访问路由
  3. 服务状态监控
  4. 接口状态监控
  5. 手动开关控制

 

 

 

 

通过 zookeeper 提供的 EPHEMERAL 节点功能,我们可以十分方便的实现服务的自动注册和摘除,从而向客户端隐藏服务端具体的服务提供者,实现动态流量控制和负载均衡

而通过与 Redis 的同步,就可以实现用户设置的保存与读取,实现更加丰富的自定义功能

而前端页面则给与使用者友好的页面来查看、操作和控制服务的监控、开关等

 

目录树

 

我们通过编写一个 python 包来让各服务来引用和使用

 

server.py

server.py 是提供给服务提供者调用的 python 文件

我们通过 appkey 来防止多个服务提供了相同 service_name 造成服务发生混淆的异常情况发生

通过 mysql unique key 来实现其唯一性,并通过 appkey 维护服务的更多持久化信息

服务提供者只需引用 probe 包,调用 start_service 方法即可实现服务的注册和提供

同时我们通过 hostname 来区分了线上和线下两套环境,实现环境间的隔离,当然,生产环境中,我们可以定义更加严格的方式来实现环境的判断和获取

如上所述,我们通过 zookeeper 提供的 EPHEMERAL 节点功能实现服务的自动注册和摘除,同时在创建节点前,我们先去 redis 中获取了节点信息,来同步上一次节点运行时的设置参数

 

  • 需要注意的是,zookeeper 的 EPHEMERAL 节点在异常断开而没有主动 close 与 zookeeper 的连接,该节点会在 sessiontimeout 时间内被检测和移除,这意味着,如果 sessiontimeout 设置过大,则已经失效的节点将在很长时间内仍停留在 zookeeper 中造成误判

 

import json import logging import os import re import socket import pymysql import thriftpy2 import yaml from kazoo.client import KazooClient from redis import Redis from rediscluster import StrictRedisCluster from thriftpy2 import rpc def start_service(thrift_path, appkey, service_name, handler, port=8081): if type(service_name).__name__ != 'str' or '' == service_name or re.match('^\w+$', service_name) is None: raise Exception('错误的 service_name (' + service_name + '), service_name 只能取字母、数字或下划线的组合') try: hostname = socket.getfqdn(socket.gethostname()) env = 'dev' if re.match('^prod\d+$', hostname) is not None: env = 'prod' if env == 'prod': logpath = '/var/log/thriftprobe' if not os.path.exists(logpath): os.system('mkdir -p ' + logpath) logpath = logpath + '/appkey_' + service_name + '_server.log' else: logpath = '' logging.basicConfig(filename=logpath, level=logging.INFO, format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') filename = os.path.join(os.path.dirname(__file__), 'config', 'config.yml') yamlfd = open(filename, encoding="utf-8") config = yaml.load(yamlfd) dbclient = pymysql.connect(host=config['mysql'][env]['host'], port=config['mysql'][env]['port'], user=config['mysql'][env]['username'], database=config['mysql'][env]['database'], password=config['mysql'][env]['password'], connect_timeout=1) sql = 'SELECT count(*) as total from serviceapp WHERE appkey = %s' cursor = dbclient.cursor(pymysql.cursors.DictCursor) values = [appkey] cursor.execute(sql, values) total = cursor.fetchone() if total is None or total['total'] <= 0: raise Exception('appkey - ' + appkey + ' 尚未注册') module_name = appkey + '_thrift' serviceThrift = thriftpy2.load(thrift_path, module_name=module_name) server = rpc.make_server(getattr(serviceThrift, service_name), handler, '0.0.0.0', port) logging.info('启动 thrift 服务 [appkey:' + appkey + ', service_name:' + service_name + ', port:' + str(port) + ']') __zk_connect(config, env, appkey, service_name, port) server.serve() except Exception as e: logging.error(e) raise e def __zk_connect(config, env, appkey, service_name, port): local_ip = socket.gethostbyname(socket.getfqdn(socket.gethostname())) if config['redis'][env]['cluster'] == 1: redis_nodes = [{'host': config['redis'][env]['host'], 'port': config['redis'][env]['port']}] redis_instance = StrictRedisCluster(startup_nodes=redis_nodes, password=config['redis'][env]['password'], socket_connect_timeout=1) else: redis_instance = Redis(host=config['redis'][env]['host'], port=config['redis'][env]['port'], password=config['redis'][env]['password']) nodeinfos = None redis_key = 'thriftprobe_' + env node_field = appkey + '|' + env + '|' + service_name + '|' + local_ip + '|' + str(port) try: nodeinfos = json.loads( redis_instance.hget(redis_key, node_field)) except: pass if nodeinfos is None: nodeinfos = { 'host': local_ip, 'port': port, 'weight': 10, 'valid': 1 } redis_instance.hset(redis_key, node_field, json.dumps(nodeinfos)) logging.info('更新节点配置信息到缓存: ' + node_field + ' - ' + json.dumps(nodeinfos)) else: logging.info('从缓存中加载节点配置信息: ' + node_field + '-' + json.dumps(nodeinfos)) host = config['zkserver'][env]['host'] + ':' + str(config['zkserver'][env]['port']) zk = KazooClient(hosts=host) zk.start(200) full_name = '/services' + '/' + env + '/' + appkey exists = zk.exists(full_name) if exists is None: zk.create(full_name, makepath=True) zk.create(full_name + '/' + service_name + '_' + local_ip + '_' + str(port), value=bytes(json.dumps(nodeinfos), encoding='utf-8'), ephemeral=True) logging.info('创建节点成功:' + json.dumps(nodeinfos))

 

 

client.py

client.py 的主要功能很简单,就是创建 thrift 的 TClient 对象,这里我们重写了 TClient 对象,从而实现接口访问时服务端地址的动态获取

用户只需调用 build_client 即可创建 TClient 对象,实现接口的调用

import logging import os import re import socket from time import sleep import thriftpy2 from thriftprobe.probe.ProbTClient import ProbTClient def build_client(thrift_path, appkey, service_name, socket_timeout = 500): hostname = socket.getfqdn(socket.gethostname()) env = 'dev' if re.match('^prod\d+$', hostname) is not None: env = 'prod' if env == 'prod': logpath = '/var/log/thriftprobe' if not os.path.exists(logpath): os.system('mkdir -p ' + logpath) logpath = logpath + '/' + appkey + '_' + service_name + '_client.log' else: logpath = '' logging.basicConfig(filename=logpath, level=logging.INFO, format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') module_name = appkey + '_thrift' client_thrift = thriftpy2.load(thrift_path, module_name=module_name) service = getattr(client_thrift, service_name) return ProbTClient(env, service, appkey, service_name, socket_timeout)

 

 

ProbTClient.py

ProbTClient 就是我们重写的 thrift TClient,每一次接口访问,我们都通过随机的方式在现有存活的 zk 服务端节点中找到可用的节点信息,随机动态分配访问地址,实现接口的访问

import functools import json import logging import os import random import re import uuid import yaml from kazoo.client import KazooClient from thrift.Thrift import TApplicationException from thriftpy2.protocol import TBinaryProtocolFactory from thriftpy2.thrift import TMessageType, args2kwargs from thriftpy2.transport import TSocket, TBufferedTransportFactory class ProbTClient(object): def __init__(self, env, service, appkey, service_name, socket_timeout): self._service = service self._appkey = appkey self._service_name = service_name self._env = env self._seqid = 0 self._iprot = None self._oprot = None self._socket_timeout = socket_timeout if env == 'prod': logpath = '/var/log/thriftprobe' if not os.path.exists(logpath): os.system('mkdir -p ' + logpath) logpath = logpath + '/appkey_' + service_name + '_client.log' else: logpath = '' logging.basicConfig(filename=logpath, level=logging.INFO, format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') def __getattr__(self, _api): if _api in self._service.thrift_services: return functools.partial(self._req, _api) raise AttributeError("{} instance has no attribute '{}'".format( self.__class__.__name__, _api)) def __dir__(self): return self._service.thrift_services def _req(self, _api, *args, **kwargs): logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 开始调用') try: _kw = args2kwargs(getattr(self._service, _api + "_args").thrift_spec, *args) kwargs.update(_kw) result_cls = getattr(self._service, _api + "_result") self._connect() self._send(_api, **kwargs) # wait result only if non-oneway if not getattr(result_cls, "oneway"): return self._recv(_api) finally: logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 完成调用') self.close() def _send(self, _api, **kwargs): self._oprot.write_message_begin(_api, TMessageType.CALL, self._seqid) args = getattr(self._service, _api + "_args")() for k, v in kwargs.items(): setattr(args, k, v) args.write(self._oprot) self._oprot.write_message_end() self._oprot.trans.flush() def _recv(self, _api): logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 开始获取数据') try: fname, mtype, rseqid = self._iprot.read_message_begin() if mtype == TMessageType.EXCEPTION: x = TApplicationException() x.read(self._iprot) self._iprot.read_message_end() raise x result = getattr(self._service, _api + "_result")() result.read(self._iprot) self._iprot.read_message_end() if hasattr(result, "success") and result.success is not None: return result.success # void api without throws if len(result.thrift_spec) == 0: return # check throws for k, v in result.__dict__.items(): if k != "success" and v: raise v # no throws & not void api if hasattr(result, "success"): raise TApplicationException(TApplicationException.MISSING_RESULT) finally: logging.info(self._service_name + '.' + _api + '[' + self._uuid + ']' + ' 完成数据获取') def _connect(self): host, port = self._get_service_addr() socket = TSocket(host, port, socket_timeout=self._socket_timeout) transport = TBufferedTransportFactory().get_transport(socket) protocol = TBinaryProtocolFactory().get_protocol(transport) transport.open() self._iprot = self._oprot = protocol self._uuid = uuid.uuid1() def _get_service_addr(self): yamlfd = open('config/config.yml', encoding="utf-8") config = yaml.load(yamlfd) host = config['zkserver'][self._env]['host'] + ':' + str(config['zkserver'][self._env]['port']) zk = KazooClient(hosts=host) zk.start(200) zk_full_name = '/services' + '/' + self._env + '/' + self._appkey exists = zk.exists(zk_full_name) if exists is None: raise Exception('服务未启动 - appkey: ' + self._appkey + '; service: ' + self._service_name) services = zk.get_children(zk_full_name) nodes = list() for service in services: if re.match(self._service_name + '_', service) is not None: nodes.append(zk.get(zk_full_name + '/' + service)) addrs = list() total_weight = 0 for node in nodes: infos = json.loads(node[0]) if infos['valid'] == 1: addrs.append(infos) total_weight += infos['weight'] if len(addrs) < 1: raise Exception('服务未启动 - appkey: ' + self._appkey + '; service: ' + self._service_name) randint = random.randint(1, total_weight) base_weight = 0 base_addr = None for addr in addrs: base_addr = addr base_weight += addr['weight'] if randint <= base_weight: break return base_addr['host'], base_addr['port'] def close(self): if self._iprot is not None: self._iprot.trans.close() if self._iprot != self._oprot and self._oprot is not None: self._oprot.trans.close()

 

 

setup.py

关于 setup.py 的编写和 pipy 包的打包上传,可以参考上一篇文章

通过 Nexus3 搭建 pypi 私服

# coding=utf-8 from setuptools import setup setup( name='thriftprobe', version='1.0', author="techlog", license="MIT", packages=[ 'probe', 'probe/config' ], package_data={'': ['*.*']}, install_requires=[ 'kazoo', 'thriftpy2', 'redis-py-cluster', 'redis', 'pymysql', 'pyyaml', ], classifiers=[ "Topic :: Utilities", "Topic :: Internet", "Topic :: Software Development :: Libraries :: Python Modules" ], )

 

 

我们的服务治理包本着简单易用的角度出发,使用非常简单

 

编写 idl

service DemoService { string say(); }

 

 

server 端代码

from probe.server import start_service class DemoServiceHandler(object): def say(self): return "hello world" start_service('demo.thrift', 'hello_www', 'DemoService', DemoServiceHandler())

 

 

client 端代码

from probe.client import build_client client = build_client('demo.thrift', 'hello_www', 'DemoService') print(client.say())

 

 

上面我们构建了基本的服务治理,目前包含了服务注册、发现与状态监控、手动控制

后续我们还可以在以下几个方面继续优化和改进上面的服务治理系统

  1. 包括版本控制、服务优先级管理、请求数量限制、节点容错等进一步的功能后续需要添加和完善
  2. 客户端、服务端各自的每次调用、连接与响应的时长打点和监控需要进一步添加,但这意味着我们需要复写 TThreadedServer 类等代码
  3. 客户端无需每次重新连接 zookeeper,也无需每次重新请求节点列表,而是通过 @zk.ChildrenWatch 或 @zk.DataWatch 来注册节点变更的触发器,从而在节点信息发生变化时,更新本地路由表

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 






技术分享      redis      service      zookeeper      thrift      服务治理      微服务     


京ICP备15018585号