巧用策略模式-逃离苦力模式

  • 背景
  • 解决方案
    • 解决方案-1
      • 出现的问题
    • 解决方案-2
      • 如何解决上述问题
      • 策略模式伪代码快速理解(只需2分钟)
      • 优化后的代码结构
  • 解决效果

背景

最近的主要工作是包装AI 算法,使之成为算法服务集群。说白了就是包装若干算法能力,提供远程调用接口,供各个调用方来调用。算法主要是媒体资源的处理,包括打标签、媒体资源质量提升(分辨率提升、画面质量提升)
算法模块比较多,大约20个左右。

解决方案:

解决方案-1

因项目比较紧急,所以起初通过搬砖模式完成了3个模块的落地,落地也没什么架构可言,就像下面这样:


image.png

(PS:因项目比较紧急,所以前期并没有做细致的容量性能评估,只能后续优化了,当前情况下先上能力比什么都重要,如果是比较大的项目,或者要求服务稳定性、性能高的服务,必须要做容量性能评估,才好确认什么是好的架构,而且这个细致的过程需要经过技术团队的评审,并且需要预留充足的评审时间,否则加班是难免的)

详细解释:

  • 绿色框代表客户端请求,其实就是内部主调方,调用模式分为存量调用 & 在线调用,具体调用量暂时不详。(优化阶段需要详细了解一下)
  • 蓝色虚线框中是部署的若干服务模块,m_srv_x代表不同的服务能力。
    工作模式大致是:客户端请求-> 请求参数处理->下载待分析资源->m_srv处理->吐出结果->回调通知调用方处理结果

带来的问题:

在包装了3个模块之后,个人发现:
不考虑服务架构上面的问题

  1. 个人陷入苦力模式、落地模块重复操作太多,带来的问题就是,重复代码量迅速扩张;
  2. 每落地一个模块就需要包装1个接口,接口量迅速提升,而且需要同步维护对接的api 文档,多的是copy,大大影响工作效率。

在紧急上线了3个模块之后,我对上述无脑式的垒代码的工作方式产生了反思。如何解决上面2个问题,是我接下来最需要优先解决的。否则,大概率陷入到苦力模式,陷入泥潭。

解决方案-2

如何解决上述问题

总结出了上述2个问题,接下来的事情就是一一分析、找解决方案了:

  1. 苦力模式- 对于每个模块重复的代码,暂且称之为前置处理(抽象共性)吧,每个模块的前置处理大致都相同,可以将相同代码提取、封装为common_util,即复用
  2. 接口量过多问题-只写一个接口,当前接口充当阉割式的API 网管,做简单参数校验、请求路由(PS:负载均衡、服务降级、熔断、限流、监控是API网关需要着重考虑的点,当前场景不需要考虑上述问题,撸一个API网关,工作量很大,不值当),这样,主调方接入也非常方便,API接口文档 ,只需要按需做少量更新,进而解放生产力。
  3. 2 中提到了请求路由,那么如何实现-解决办法是客户端请求时携带 http://xxx.com/api?module_name=$module_name ,根据module_name 将请求路由到具体模块。
  4. 2 中请求路由问题解决后,又有新问题,请求路由的具体实现方式是什么呢? 难道要写若干if... else来实现,实现起来也没问题,但是会带来如下问题:
    1. 如果某个模块代码写的有问题,那么其他模块也不能成功运行。
    1. 如果要下线、上线新模块,那么代码就需要重新更新,添加新的if...else代码块
      4中的2个问题有没有解决方案呢?有的,那就是通过策略设计模式来实现。

策略模式伪代码快速理解(只需2分钟)

1. 定义策略模式契约 -AbstractHandler

class AbstractHandler:
     def __init__(self, next_handler=None):
          self.next_handler = next_handler
    
    def _judge(self, request):
         pass
    
   def _handler(self):
        # 处理请求,吐出结果
        pass

     def handler(self, request):
          if judgement(request):
            self._handler(request):
          else:
            if self.next:
               return self.next.handler(request)
         return None

2. 定义具体策略执行类-MSrv1Handler

class Msr1Handler(AbstractHandler):
    def __init__(self):
         super().__init__()
  
   def _judge(self, request):
        return True if request > 10 else False
           
   def _handler(self):
       print('Msr1Handler handled this request')

3. 定义具体策略执行类-MSrv2Handler

class Msr2Handler(AbstractHandler):
    def __init__(self):
         super().__init__()
  
   def _judge(self, request):
        return True if request < 10 else False
           
   def _handler(self):
       print('Msr2Handler handled this request')

...
简介下上述3个文件充当的角色:

  • AbstractHandler-策略契约定义者,其中定义了处理过程,如果当前遵守契约的处理单元不能处理此请求,则交由下一个处理单元去处理。
  • Msr1Handler & Msr2Handler- 遵守(extends)策略契约的处理单元,其中只定义了_judge() 判定自己可处理请求、和 _handler() 处理请求的核心逻辑,但是其具有 handel() 方法,因为通过 class Msr2Handler(AbstractHandler):继承而来。
  • AbstractHandler next 属性 - next把遵守契约的处理单元串成链表,这样请求就可以在这条链上传递,直至被某个处理单元处理。

优化后的代码结构

分层结构

代码分层

简单解释下:

common: 封装了常用操作,如下载资源、生成输出文件存储目录、下载文件重命名等功能‘
exception: 自定义的业务逻辑异常
handler_engine: Ai 模块处理引擎仓库,可实现平铺式新增能力。
module_handler_list: 模块引擎初始化入口
workflow: 一个统一化接口,接收请求,并路由请求到处理引擎。

抽象引擎-策略契约定义者 AbstractModuleHandler

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time    : 2020/4/4 下午8:32
# @Author  : 
# @Site    : 
# @File    : handler.py
# @Software: PyCharm
# 抽象处理逻辑为职责链,通过外部请求参数控制链节点进行请求处理
  eg: xx?module_name=A -> moduleA.handler()
"""
import json
import functools

print = functools.partial(print, flush=True)
from concurrent.futures import ThreadPoolExecutor

from config.ai_module_config import all_modules
from common.common_handler_util import *
from exception.async_handler_exception import AsyncHandlerException

EXECUTOR = ThreadPoolExecutor(8)


class AbstractRequestEntity():
    """请求实例化为entity"""


class AbstractModuleHandler:
    """
    抽象模型处理器
    """

    def __init__(self):
        self.fill_attr()
        self.next_module_handler = None

    def _fill_attr(self, attr_tuple):
        """
        动态填充属性 辅助函数
        :param attr_tuple: (key,val) = self.$key = $val
        :return:
        """

        if not isinstance(attr_tuple, tuple):
            return False, 'dynamic attr not match format,please give attr like (key,val)'

        attr_name, attr_val = attr_tuple
        if getattr(self, attr_name, None):
            return
        setattr(self, attr_name, attr_val)

    def fill_attr(self):
        current_module = all_modules.get(self._module_name, {})
        for module in current_module.items():
            self._fill_attr(module)
        return

    def _check_request_param(self, request):
        if self._module_name != request.get('module_name', ''):
            ''' judge request module name '''
            return False, 'this module not the correct module handler'
        for param in request.items():
            param_name = param[0]
            request_params = getattr(self, 'request_params', None)
            if not request_params:
                return False, 'this module don\'t need request params!'
            if param_name not in request_params:
                return False, 'this request is not legitimate'
        return True, 'check request params success'

    def check_request_params(self, request):
        if not request:
            return False, 'request params not be none!', ''
        if not isinstance(request, dict):
            return False, 'request params\'s type must be map', ''
        return self._check_request_param(request), ''

    def _async_prev_handler(self, request):
        """
        前置处理器
        :param request:
        :return:
        """
        local_file_path = ''
        output_root_dir = ''

        source_url = request.get('source_url', {})
        download_flag = getattr(self, 'download_flag')
        store_local_flag = getattr(self, 'store_local_flag')
        allow_media_type = getattr(self, 'allow_media_type')
        source_type = source_url.split('.')[-1]
        origin_file_name = source_url.split('/')[-1]

        # 验证资源
        if source_type not in allow_media_type:
            raise AsyncHandlerException(res_code=-1, res_msg='source type not allow')
        # 下载资源
        if download_flag:
            try:
                print(source_url)
                ret, msg, local_file_path, gen_date = download_remote_source(remote_uri=source_url,
                                                                             file_name=origin_file_name,
                                                                             module_name=self._module_name)
            except Exception as err:
                raise AsyncHandlerException(res_code=-2,
                                            res_msg='download remote source has occurred failed,detail=%s' % str(err))
        # 创建输出文件夹
        if download_flag and store_local_flag:
            output_root_dir = gen_store_root_dir(gen_date, 'output', self._module_name)

        return local_file_path, output_root_dir, request

    def _async_core_ability(self, *args):
        """
        核心能力
        :return:
        """

    def _async_post_handler(self, *args):
        """
        核心处理器完成之后,进行后续处理 如 upload cdn
        :return:
        """

    def _async_after_completion(self, *args):
        """post handler 完成后,进行回调通知 or 资源销毁等动作"""
        print(args)
        ret_data, request = args
        ret_data['data']['extra'] = request.get('extra')
        call_back_url = request.get('call_back_url')
        payload = "{\"bussiness_data\": %s\n}" % json.dumps(ret_data)
        headers = {
            'content-type': "application/json"
        }
        response = requests.request("POST", call_back_url, data=payload, headers=headers)
        print(response.text)

    def async_handler(self, request):
        try:
            # 异步前置处理
            args = self._async_prev_handler(request)
            # 异步核心能力调用
            args = self._async_core_ability(args)
            # 异步后置处理,如上传cdn
            ret_data = self._async_post_handler(args)
        except Exception as err:
            if isinstance(err, AsyncHandlerException):
                ret_data = err.res_data if err.res_data else {"res_code": err.res_code, "err_msg": err.res_msg}
            else:
                ret_data = {"res_code": -10,
                            "err_msg": "server has occurred error,detail=%s,please call bofengliu" % str(err)}
        finally:
            # 异步清理资源,回调处理,没想好怎么写
            self._async_after_completion(ret_data, request)

    def handle(self, request):
        """
        核心处理器
        :param request:
        :return:
        """
        ret, msg = self._check_request_param(request)
        if not ret:
            if self.next_module_handler:
                return self.next_module_handler.handle(request)
            return json.dumps(
                {"res_code": -6, "err_msg": "request params not match %s required params" % self._module_name})
        # 执行之前先返回,确认接受参数的响应
        EXECUTOR.submit(self.async_handler, request)
        return json.dumps({"res_code": 0, "err_msg": "received %s task success, running now!" % self._module_name})

最重要的是 handler() & async_handler()方法,在这个类中,我们定义了处理请求的核心接口,其他具体实现这个抽象契约类的子类,只需要关注 _async_core_ability()_async_post_handler() 后置处理了。

统一化阉割版API 网关:

@app.route('/module/v1/api', methods=['POST'])
def dispatcher_handler():
    """
    统一网管
    :return:
    """
    if 'POST' != request.method:
        return jsonify({
            "res_code": 1,
            "err_msg": "request method must be POST"
        })

    request_params = request.get_json()
    return all_module_handlers.handle(request_params)

优化后可以预想到的提升效果

采用策略模式后,如果新上线一个模型,我们只需要新建一个继承策略类的module类,且实现 _async_core_ability()_async_post_handler() 就可以了,接口完全不需要改变。工作效率提升 70% 没有问题,至此就已经逃离了苦力模式,美滋滋。

发现这里面其实还应用到了模板方法模式,其实就是 AbstractHandler 类;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容