跳转至

模式二:Processor模式

此种模式下,对应着上面所说的通过DAG配置数据处理管道,开发者开发的数据处理节点,作为DAG里面的一个节点,对实时数据输入进行处理,并由框架送到下游节点。

🚀开发环境搭建

✅前置要求

  • Python >= 3.10, 首选 Python 3.12
  • 支持apt的Linux发行版,本文档基于Ubuntu 20.04
  • 安装apt公钥,确保执行了以下操作。详情参考:APT包发布管理
    • 步骤1:首先添加内部apt公钥
      apt-get update & apt-get install gnupg & apt-key add nexus_apt_public.gpg.key
      
    • 步骤2:更新apt sources.list文件
      cat >>  /etc/apt/sources.list <<EOF
      deb https://nexus.mybigai.ac.cn/repository/apt-host/ bigai main
      EOF
      

💾安装依赖

# 安装tongos so开发库,version_you_want为您需要的版本号,比如1.0.0-e710ae4
apt update && apt install bigai-eps=${version_you_want}  
# 添加内部镜像源
pip config set global.index-url https://nexus.mybigai.ac.cn/repository/pypi/simple
# 安装tongos python开发包,version_you_want为您需要的版本号,比如0.0.1a36
pip install tongos-general-unit-python==${version_you_want} 

📖开发示例

我们假设您的工程项目结构如下

.(test_processor_demo)
├── test                   # 测试代码
├── server.py              # 您的processor文件
├── component              # 您的核心功能代码,包含模型初始化和推理接口(本示例中非必要)
│ └── my_fun.py
├── config                 # 您的配置文件(本示例中非必要)
│ └── config.yaml
└── requirements.txt       # 所依赖安装包(本示例中非必要)
processor/server.py里面实现您的Processor,继承tongos_eps_service.TongOSEPSUnitService

  1. 编写构造函数, 需初始化父类构造函数 super().__init__()
  2. 编写StreamStart(self, session_id, processor_id, context): 处理session 数据流初始化操作,可省略
  3. 编写StreamEnd(self, session_id, processor_id, context): 处理session 数据流结束操作,可省略
  4. 编写处理函数 MediaChunkProcess( self, session_id, processor_url, processor_id, media_chunks, context): 处理核心逻辑,不可省略
  5. 配置grpc参数,启动服务

server.py示例如下:

# 导入依赖 package
from tongos_general_unit_python.service import tongos_eps_service
from tongos_general_unit_python.util.util import create_logger_by_context
from tongos_general_unit_python.protos import tongos_event_pb2

# 此处假设demo_pb2.py和server.py处于同一目录,实际开发时需要您根据情况调整
from test.demo_pb2 import RGBMedia, DepthMedia, TextMedia

# 一个mock的处理函数,此处由开发者实现自己的处理逻辑
def process(inputs):
    return inputs["rgb"].data + ":" + inputs["depth"].data

class DemoUnitService(tongos_eps_service.TongOSEPSUnitService):
    def __init__(self):
        # 初始化父类
        super().__init__()
        # 开发者自定义的初始化逻辑

    # 一个session的启动触发
    def StreamStart(self, session_id, processor_id, context):
        # 开发者自定义的处理逻辑

        # 请使用框架封装好的log来打印日志
        log = create_logger_by_context(ctx=context)
        log.info("StreamStart start")
        return None

    # 一个session的结束触发
    def StreamEnd(self, session_id, processor_id, context):
        # 开发者自定义的处理逻辑

        log = create_logger_by_context(ctx=context)
        log.info("StreamEnd start")
        return None

    # 处理逻辑
    def MediaChunkProcess(
        self, session_id, processor_url, processor_id, media_chunks, context
    ):
        inputs = {}
        resp_media = {}
        # media_chunks是一个字典,key为TAG:INDEX:STREAM,value为指定类型的数据
        for key, media_chunk in media_chunks.items():
            tag_index_stream = key.split(":")
            if tag_index_stream[0] == "RGB" and tag_index_stream[1] == "0":  # RGB
                inputs["rgb"] = RGBMedia.ChunkData()
                media_chunk.data.Unpack(inputs["rgb"])
            if tag_index_stream[0] == "DEPTH" and tag_index_stream[1] == "0":  # DEPTH
                inputs["depth"] = DepthMedia.ChunkData()
                media_chunk.data.Unpack(inputs["depth"])

        mock_resp = process(inputs)
        # 返回的resp_media是一个字典,key同样为TAG:INDEX:STREAM,此处STREAM不重要,可以用xxx代替
        # value同样是指定类型的数据
        resp_media["TEXT:0:text"] = TextMedia.ChunkData(data=mock_resp)
        return resp_media

service_instance = DemoUnitService()
option = [
    ("grpc.max_send_message_length", 100 * 1024 * 1024),
    ("grpc.max_receive_message_length", 100 * 1024 * 1024),
]
# eps lib 配置
epslibPath = '/usr/lib/libeps_python.so'
service_instance.SetPythonEPSLibPath(epslibPath)
# 服务启动
service_instance.serve("", 10, [], option)

📋线下测试

如需进行线下测试,可以将下列文件添加到test目录下进行测试

.(test_processor_demo)
├── test                   # 测试代码
    ├── client.py
    ├── demo_graph.txt
    ├── demo.proto
    ├── __init__.py
    ├── run_client.sh
    └── run_server.sh
test/run_server.sh

# 服务端接受数据流的端口
export EPS_EVENT_GRPC_PORT=50013
# processor名称
export EPS_CAPABILITY_URL=bigai.tongos.processor.ImageToTextProcesso
# 服务名称
export PROCESSOR_SERVICE=test_service
# 是否连接中控,离线测试必须设置为true
export NO_EPC=true

python server.py

test/client.py

from tongos_general_unit_python.service import client_eps_base
from tongos_general_unit_python.protos import tongos_event_pb2, eps_event_service_pb2
from google.protobuf import any_pb2
from demo_pb2 import RGBMedia, DepthMedia, TextMedia
class TestClientEPS(client_eps_base.ClientEpsBase):
    def __init__(self, config):
        super().__init__(config)

    def process_response(self, response):
        for chunk in response:
            holder = TextMedia.ChunkData()
            chunk.data_chunk.data.Unpack(holder)
            print("resp: {}".format(holder.data))

    def get_media_chunks(self):
        for ts in range(5):
            chunks = []
            demo_media = RGBMedia.ChunkData(
                data = "rgb_data_" + str(ts),
            )
            msg = any_pb2.Any()
            msg.Pack(demo_media)
            event = tongos_event_pb2.TongOSEvent(
                create_ts = ts,
                bigai_event_url = msg.type_url,
                data = msg
            )
            chunk = eps_event_service_pb2.StreamPacket.StreamChunk(
                stream_tag = "RGB",
                stream_index = 0,
                stream_name = "rgb",
                data_chunk = event
            )
            chunks.append(chunk)
            demo_media = DepthMedia.ChunkData(
                data = "depth_data_" + str(ts),
            )
            msg = any_pb2.Any()
            msg.Pack(demo_media)
            event = tongos_event_pb2.TongOSEvent(
                create_ts = ts,
                bigai_event_url = msg.type_url,
                data = msg
            )
            chunk = eps_event_service_pb2.StreamPacket.StreamChunk(
                stream_tag = "DEPTH",
                stream_index = 0,
                stream_name = "depth",
                data_chunk = event
            )
            chunks.append(chunk)
            yield chunks

config = client_eps_base.EpsClientConfig(
    graph_path="demo_graph.txt"
)

client = TestClientEPS(config)
client.run()

test/run_client.sh

# 服务端接受数据流的端口
export EPS_EVENT_GRPC_ADDRESS="127.0.0.1:50013"
# 是否连接中控,离线测试必须设置为true
export NO_EPC=true
python client.py

test/demo.proto

syntax = "proto3";
package demo;

message RGBMedia {
    message ChunkData {
        string data = 1;
    }
}

message DepthMedia {
    message ChunkData {
        string data = 1;
    }
}

message TextMedia {
    message ChunkData {
        string data = 1;
    }
}

可在test目录下执行下述命令生成demo_pb2.py:

python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. *.proto

test/demo_graph.txt

node {
  capability_url: "bigai.tongos.processor.ImageToTextProcessor"
  input_stream: "RGB:0:rgb"
  input_stream: "DEPTH:0:depth"
  output_stream: "TEXT:0:text"
}
input_stream: "rgb"
input_stream: "depth"
output_stream: "text"

__init__.py

空文件。将test目录声明为一个模块,从而使得server能够import demo_pb2

测试时,首先在根目录运行test/run_server.sh启动服务端,然后在test目录下运行run_client.sh,即可分别启动服务端和客户端

此外,若希望尝试服务端和客户端独立部署,只需要将test/run_client.sh中EPS_EVENT_GRPC_ADDRESS更改为对应服务端ip即可。

❓常见问题与解决方案

总结了Processor模式下的常见问题与解决方案,也欢迎开发者在发现问题后与我们联系。

💾conda环境下so出现报错

在conda环境下启动服务时,可能会出现如下报错信息:

OSError: /lib/x86_64-linux-gnu/libgdal.so.26: undefined symbol: TIFFReadRGBATileExt, version LIBTIFF_4.0

undefined symbol: TIFFReadRGBATileExt, version LIBTIFF_4.0

解决方法:

# 问题1:TIFFReadRGBATileExt, version LIBTIFF_4.0
# 在/home/anaconda3/envs/xxx/lib目录中执行
# 先备份lib下的libffi.so.7
sudo ln -s /lib/x86_64-linux-gnu/libffi.so.7.1.0 libffi.so.7
sudo ldconfig

# 问题2:undefined symbol: TIFFReadRGBATileExt, version LIBTIFF_4.0
export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libtiff.so.5

问题原因:

这一类问题的原因是conda环境下默认的channel中会构建跨版本的软连接,如

(conda_env)   lib pwd
/home/$USER_NAME/anaconda3/lib
(conda_env)   lib ls -lah|grep libffi
lrwxrwxrwx   1 user user   1 libffi.7.so -> libffi.so.8.1.2
lrwxrwxrwx   1 user user   1 libffi.so.7 -> libffi.so.8.1.2
(conda_env)   lib ls -lah|grep libtiff     
lrwxrwxrwx   1 user user   1 libtiff.so.5 -> ./libtiff.so.6

在上述问题1和问题2中,conda默认channel中的feedstock在安装依赖时会将libffi.so.7链接至libffi.so.8.1.0,详见libffi-feedstock/recipe/build.sh以及libtiff-feedstock/recipe/build.sh相关问题已经被提出,但维护者并未给出修复或解决办法。

除了上述方案外,另一个可行的修复方法为使用社区维护的conda-forge代替默认的channel。