模式二:Processor模式
此种模式下,对应着上面所说的通过DAG配置数据处理管道,开发者开发的数据处理节点,作为DAG里面的一个节点,对实时数据输入进行处理,并由框架送到下游节点。
🚀开发环境搭建
✅前置要求
- Python >= 3.10, 首选 Python 3.12
- 支持apt的Linux发行版,本文档基于Ubuntu 20.04
- 安装apt公钥,确保执行了以下操作。详情参考:APT包发布管理
- 步骤1:首先添加内部apt公钥
- 步骤2:更新apt sources.list文件
💾安装依赖
# 安装tongos so开发库,version_you_want为您需要的版本号,比如1.0.0-e710ae4
apt update && apt install bigai-eps=${version_you_want}
# 添加内部镜像源
pip config set global.index-url https://nexus.mybigai.ac.cn/repository/pypi/simple
# 安装tongos python开发包,version_you_want为您需要的版本号,比如0.0.1a36
pip install tongos-general-unit-python==${version_you_want}
📖开发示例
我们假设您的工程项目结构如下
.(test_processor_demo)
├── test # 测试代码
├── server.py # 您的processor文件
├── component # 您的核心功能代码,包含模型初始化和推理接口(本示例中非必要)
│ └── my_fun.py
├── config # 您的配置文件(本示例中非必要)
│ └── config.yaml
└── requirements.txt # 所依赖安装包(本示例中非必要)
- 编写构造函数, 需初始化父类构造函数 super().__init__()
- 编写StreamStart(self, session_id, processor_id, context): 处理session 数据流初始化操作,可省略
- 编写StreamEnd(self, session_id, processor_id, context): 处理session 数据流结束操作,可省略
- 编写处理函数 MediaChunkProcess( self, session_id, processor_url, processor_id, media_chunks, context): 处理核心逻辑,不可省略
- 配置grpc参数,启动服务
server.py示例如下:
# 导入依赖 package
from tongos_general_unit_python.service import tongos_eps_service
from tongos_general_unit_python.util.util import create_logger_by_context
from tongos_general_unit_python.protos import tongos_event_pb2
# 此处假设demo_pb2.py和server.py处于同一目录,实际开发时需要您根据情况调整
from test.demo_pb2 import RGBMedia, DepthMedia, TextMedia
# 一个mock的处理函数,此处由开发者实现自己的处理逻辑
def process(inputs):
return inputs["rgb"].data + ":" + inputs["depth"].data
class DemoUnitService(tongos_eps_service.TongOSEPSUnitService):
def __init__(self):
# 初始化父类
super().__init__()
# 开发者自定义的初始化逻辑
# 一个session的启动触发
def StreamStart(self, session_id, processor_id, context):
# 开发者自定义的处理逻辑
# 请使用框架封装好的log来打印日志
log = create_logger_by_context(ctx=context)
log.info("StreamStart start")
return None
# 一个session的结束触发
def StreamEnd(self, session_id, processor_id, context):
# 开发者自定义的处理逻辑
log = create_logger_by_context(ctx=context)
log.info("StreamEnd start")
return None
# 处理逻辑
def MediaChunkProcess(
self, session_id, processor_url, processor_id, media_chunks, context
):
inputs = {}
resp_media = {}
# media_chunks是一个字典,key为TAG:INDEX:STREAM,value为指定类型的数据
for key, media_chunk in media_chunks.items():
tag_index_stream = key.split(":")
if tag_index_stream[0] == "RGB" and tag_index_stream[1] == "0": # RGB
inputs["rgb"] = RGBMedia.ChunkData()
media_chunk.data.Unpack(inputs["rgb"])
if tag_index_stream[0] == "DEPTH" and tag_index_stream[1] == "0": # DEPTH
inputs["depth"] = DepthMedia.ChunkData()
media_chunk.data.Unpack(inputs["depth"])
mock_resp = process(inputs)
# 返回的resp_media是一个字典,key同样为TAG:INDEX:STREAM,此处STREAM不重要,可以用xxx代替
# value同样是指定类型的数据
resp_media["TEXT:0:text"] = TextMedia.ChunkData(data=mock_resp)
return resp_media
service_instance = DemoUnitService()
option = [
("grpc.max_send_message_length", 100 * 1024 * 1024),
("grpc.max_receive_message_length", 100 * 1024 * 1024),
]
# eps lib 配置
epslibPath = '/usr/lib/libeps_python.so'
service_instance.SetPythonEPSLibPath(epslibPath)
# 服务启动
service_instance.serve("", 10, [], option)
📋线下测试
如需进行线下测试,可以将下列文件添加到test目录下进行测试
.(test_processor_demo)
├── test # 测试代码
├── client.py
├── demo_graph.txt
├── demo.proto
├── __init__.py
├── run_client.sh
└── run_server.sh
# 服务端接受数据流的端口
export EPS_EVENT_GRPC_PORT=50013
# processor名称
export EPS_CAPABILITY_URL=bigai.tongos.processor.ImageToTextProcesso
# 服务名称
export PROCESSOR_SERVICE=test_service
# 是否连接中控,离线测试必须设置为true
export NO_EPC=true
python server.py
test/client.py
from tongos_general_unit_python.service import client_eps_base
from tongos_general_unit_python.protos import tongos_event_pb2, eps_event_service_pb2
from google.protobuf import any_pb2
from demo_pb2 import RGBMedia, DepthMedia, TextMedia
class TestClientEPS(client_eps_base.ClientEpsBase):
def __init__(self, config):
super().__init__(config)
def process_response(self, response):
for chunk in response:
holder = TextMedia.ChunkData()
chunk.data_chunk.data.Unpack(holder)
print("resp: {}".format(holder.data))
def get_media_chunks(self):
for ts in range(5):
chunks = []
demo_media = RGBMedia.ChunkData(
data = "rgb_data_" + str(ts),
)
msg = any_pb2.Any()
msg.Pack(demo_media)
event = tongos_event_pb2.TongOSEvent(
create_ts = ts,
bigai_event_url = msg.type_url,
data = msg
)
chunk = eps_event_service_pb2.StreamPacket.StreamChunk(
stream_tag = "RGB",
stream_index = 0,
stream_name = "rgb",
data_chunk = event
)
chunks.append(chunk)
demo_media = DepthMedia.ChunkData(
data = "depth_data_" + str(ts),
)
msg = any_pb2.Any()
msg.Pack(demo_media)
event = tongos_event_pb2.TongOSEvent(
create_ts = ts,
bigai_event_url = msg.type_url,
data = msg
)
chunk = eps_event_service_pb2.StreamPacket.StreamChunk(
stream_tag = "DEPTH",
stream_index = 0,
stream_name = "depth",
data_chunk = event
)
chunks.append(chunk)
yield chunks
config = client_eps_base.EpsClientConfig(
graph_path="demo_graph.txt"
)
client = TestClientEPS(config)
client.run()
test/run_client.sh
# 服务端接受数据流的端口
export EPS_EVENT_GRPC_ADDRESS="127.0.0.1:50013"
# 是否连接中控,离线测试必须设置为true
export NO_EPC=true
python client.py
test/demo.proto
syntax = "proto3";
package demo;
message RGBMedia {
message ChunkData {
string data = 1;
}
}
message DepthMedia {
message ChunkData {
string data = 1;
}
}
message TextMedia {
message ChunkData {
string data = 1;
}
}
可在test目录下执行下述命令生成demo_pb2.py:
test/demo_graph.txt
node {
capability_url: "bigai.tongos.processor.ImageToTextProcessor"
input_stream: "RGB:0:rgb"
input_stream: "DEPTH:0:depth"
output_stream: "TEXT:0:text"
}
input_stream: "rgb"
input_stream: "depth"
output_stream: "text"
__init__.py
空文件。将test目录声明为一个模块,从而使得server能够import demo_pb2
测试时,首先在根目录运行test/run_server.sh启动服务端,然后在test目录下运行run_client.sh,即可分别启动服务端和客户端
此外,若希望尝试服务端和客户端独立部署,只需要将test/run_client.sh中EPS_EVENT_GRPC_ADDRESS更改为对应服务端ip即可。
❓常见问题与解决方案
总结了Processor模式下的常见问题与解决方案,也欢迎开发者在发现问题后与我们联系。
💾conda环境下so出现报错
在conda环境下启动服务时,可能会出现如下报错信息:
OSError: /lib/x86_64-linux-gnu/libgdal.so.26: undefined symbol: TIFFReadRGBATileExt, version LIBTIFF_4.0
undefined symbol: TIFFReadRGBATileExt, version LIBTIFF_4.0
解决方法:
# 问题1:TIFFReadRGBATileExt, version LIBTIFF_4.0
# 在/home/anaconda3/envs/xxx/lib目录中执行
# 先备份lib下的libffi.so.7
sudo ln -s /lib/x86_64-linux-gnu/libffi.so.7.1.0 libffi.so.7
sudo ldconfig
# 问题2:undefined symbol: TIFFReadRGBATileExt, version LIBTIFF_4.0
export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libtiff.so.5
问题原因:
这一类问题的原因是conda环境下默认的channel中会构建跨版本的软连接,如
(conda_env) ➜ lib pwd
/home/$USER_NAME/anaconda3/lib
(conda_env) ➜ lib ls -lah|grep libffi
lrwxrwxrwx 1 user user 1 libffi.7.so -> libffi.so.8.1.2
lrwxrwxrwx 1 user user 1 libffi.so.7 -> libffi.so.8.1.2
(conda_env) ➜ lib ls -lah|grep libtiff
lrwxrwxrwx 1 user user 1 libtiff.so.5 -> ./libtiff.so.6
在上述问题1和问题2中,conda默认channel中的feedstock在安装依赖时会将libffi.so.7链接至libffi.so.8.1.0,详见libffi-feedstock/recipe/build.sh以及libtiff-feedstock/recipe/build.sh。相关问题已经被提出,但维护者并未给出修复或解决办法。
除了上述方案外,另一个可行的修复方法为使用社区维护的conda-forge代替默认的channel。