AiAgent定制项目

subtitle

Posted by Kun on April 23, 2025

LIVEKIT_URL=wss://kuntool-mgegyozf.livekit.cloud LIVEKIT_API_KEY=APIj76wPUGZzkrV LIVEKIT_API_SECRET=p5Uu8HIrT6Vo9o1EOnmeCzvef8b6RKwuf6i9VCxJNzYB

AiAgent定制服务系统

基于个性化定制+简单定制实现的,客户可以跟我们的项目经理沟通功能,由后端工程师和AiAgent工程师进行创建,有一些简易的功能(例如固定的pormpt)我们提供相应的接口

企业客户购买一套我们的小程序,技术支持,和运维人员去部署一套微信小程序,并且设置client_id

后台还能对页面配置进行修改,权限控制,root:进行创建账号,用户登录时返回可看见页面树状结构,

实时语音聊天模块 Realtime

当时我和组长负责这一个模块尝试,需要先跑通demo.我们大模型集成的是coze平台,当时我先查找了官方文档,并没有这个功能.

首先我们就先进行任务拆解,这个任务可以拆解成,前端获取语音对话流,发送给后端,后端将语音发送给语音转文字的ASR模型,在将生成的文字发送给大模型,大模型可以返回流式文字,再进行TTS语音合成.所有的功能呢都是由后端通过restful接口进行操作的,这其中的时间链路非常的长.我们经过测试前端发送一条10秒的语音,后端需要长达30秒才能进行响应,大模型语音打断也不灵敏,而且用户的语音数据都需要经过后端服务器,数据并发量大起来的话,对于带宽的要求也很高.所以就否决了这个流程

后面想着这些链路能不能直接在前端实现,由前端直接完成这一套的调用,后端就负责进行处理业务方面的校验.事实上这个是可行的,但是安全性难以保证.万一其他人逆向出前端存储的apikey,被盗刷这个损失就很大.后来了解到WebRTC技术,它是通过room的概念实现,将客户端加入room,连接webRTC服务器再将大模型一系列服务加入room,使他们实现相互通信,视频会议也是通过这个实现的.接下来我们确定好了技术实现,搭建了WebRTC服务器,使用的是liveKit这个开源项目,同时还能集成所需要的这些模型.

整体流程清楚了,前端先向后端发送一个createRoom请求,后端生成room_id,后端向livekit服务器发送一个请求生成token,和连接url 还有这写转文字接口密钥和其他一些信息 ,并记录下用户信息,如果在原来的对话基础上就是需要进行查数据库获取到原先的conversation记录,发送给大模型传输上下文.如果是新对话就需要create_conversation.还支持指定LLM模型,

前端通过token和url使用livekit的sdk就可以进行连接,

后端通过webhook就可以实现对用户状态的监听,加入房间,对话,对话打断,离开房间,livekit会发送webhook请求后端.然后就可以进行一些业务处理了.例如对话计费,

AI润色

面试官问:你在项目中是如何实现实时语音聊天的?

我参与了一个实时语音对话模块的开发,整体目标是实现用户通过语音与大模型实时交流的能力。

第一阶段:探索与问题分析

初期我和组长尝试了传统的串联方案:

  1. 前端采集语音流发给后端;
  2. 后端调用 ASR 语音识别模型,将语音转成文本;
  3. 然后将文本发给大模型进行处理;
  4. 最后再将大模型返回的文本转为语音(TTS)返回前端。

我们将这套流程跑通了,但在实测中发现几个关键问题:

  • 整个链路延迟很高,10秒语音处理可能要30秒;
  • 大模型语音打断不灵敏,体验差;
  • 所有语音数据都要通过后端,带宽和并发压力大。

因此我们否决了这一方案。

第二阶段:方案重构与技术选型

为了降低延迟,我们考虑能否将语音链路前置到前端,只让后端处理业务逻辑。这个方案虽可行,但存在 API Key 暴露风险,安全性无法保障。

于是我们开始调研实时通信技术,最终选型了 WebRTC + LiveKit 方案:

  • WebRTC 允许音视频低延迟传输;
  • LiveKit 是一个开源的 WebRTC 框架,支持 room 概念,适合多人通信,也可以将模型服务作为 client 加入 room。

第三阶段:系统设计与实现流程

我们构建的系统整体流程如下:

  1. 前端发起 createRoom 请求;
  2. 后端生成 roomId 和 LiveKit Token,请求 LiveKit Server,返回连接参数;
  3. 后端同时准备好需要的 ASR、TTS、大模型调用信息;
  4. 如果是延续对话,后端会从数据库查询 conversation 上下文传给大模型;
  5. 前端用 LiveKit SDK 和 Token 加入房间,实现语音流的传输;
  6. 模型服务以 Client 身份也加入 room,接收音频流、处理并返回结果;
  7. LiveKit Webhook 用于监听用户状态变化(加入、离开、对话中断),后端可基于此进行日志记录、计费等业务处理。

这个架构极大降低了延迟,同时也增强了可扩展性,体验上更接近实时对话。

为什么不直接用 WebSocket 做实时语音聊天,而选 WebRTC?

我们其实一开始也考虑过用 WebSocket 实现语音传输,但后来分析后发现 WebSocket 并不适合这种场景,原因有几点:

1. WebSocket 是基于 TCP 的文本/二进制通道,不适合低延迟音视频传输

WebSocket 更适合传输文本或结构化数据,比如实时文本聊天、游戏状态同步、协作编辑等。

实时语音对话延迟要求很高,一旦卡顿或抖动,体验就会断裂。而 WebSocket 缺乏音视频传输专用的网络优化机制,比如丢包容忍、带宽自适应、回声消除等。

2. WebRTC 是为音视频场景专门设计的,具备天生优势

  • 使用 UDP 通信,支持低延迟、高实时性;
  • 内置了音频编解码器、降噪、回声消除等功能;
  • 支持点对点连接和多端参与(room),很适合我们这种用户和模型服务同时加入房间的架构。

3. WebRTC + LiveKit 更适合我们的服务架构

LiveKit 提供了完整的房间管理、Token 认证、媒体路由等功能,我们可以将大模型语音服务也作为 client 加入房间,实现音频流的自动流转,不需要手动写音频流分包、同步、重组等底层逻辑。

而如果用 WebSocket,我们需要自己处理音频采集、分包、时序控制、模型对接等大量底层细节,开发成本高、稳定性差

小程序大模型对话

使用的是websocket连接,建立连接

大模型flux异步响应,用户端发送content,后端接收就请求大模型接口,响应,一个字一个字进行返回

并发量

设置了WebSocketSessionManager,用于管理连接,使用ConcurrentHashMap<String, Session>进行存储用户user_id和用户session, 连接生命周期的绑定,连接开始调用onopen方法将user_id和seeion加到concurenhashmap中和,连接关闭onclose时同时移除,还可以设置心跳机制

  • 小程序定时发送心跳包(如每 30 秒)
  • 后端接收 ping 数据,刷新连接时间
  • 定时清理超过 N 分钟无心跳的连接(释放资源)
1
2
3
4
5
6
7
8
9
@Scheduled(fixedRate = 60000)
public void cleanDeadSessions() {
    for (Map.Entry<String, Session> entry : sessions.entrySet()) {
        if (!entry.getValue().isOpen()) {
            sessions.remove(entry.getKey());
        }
    }
}

限流策略,一个用户智能保持一个websocket连接,限制用户的发送消息的频率

引入spring提供的线程池ThreadPoolTaskExcutor底层封装的ThreadPoolExecutor,有ExecutorConfigurationSupport方法实现了InitailizingBean,当初始化代码的配置时就会调用这个方法从而实现自动创建ThreadPoolExecutor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean(name ="commonTaskAsyncPool") 
public ThreadPoolTaskExecutor commonTaskAsyncPool() {
    logger.info("初始化commonTaskAsyncPool开始");

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(10);                 // 核心线程数
    executor.setMaxPoolSize(1000);                // 最大线程数
    executor.setQueueCapacity(10000);             // 等待队列长度
    executor.setKeepAliveSeconds(60);             // 非核心线程空闲存活时间
    executor.setThreadNamePrefix("Pool-CommonTask");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    executor.initialize();                        // 初始化线程池

    logger.info("初始化commonTaskAsyncPool结束");
    return executor;
}

websocket的onMessage方法调用线程池进行响应

使用spring的而不使用JUC中的ThreadPoolExcutor是应为不受spring生命周期管理,task进行增强则有更多特性使用

corePoolSize——核心线程最大数

maximumPoolSize——线程池最大线程数

keepAliveTime——空闲线程存活时间。

  • 当一个非核心线程被创建,使用完归还给线程池
  • 一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由 keepAliveTime来设定

unit——空闲线程存活时间单位

  • 这是keepAliveTime的计量单位

workQueue——等待队列

  • 当线程池满了,线程就会放入这个队列中。任务调度再取出

jdk提供四种工作队列

  1. ArrayBlockingQueue——基于数组的阻塞队列,按FIFO,新任务放队尾
    • 有界的数组可以防止资源耗尽问题。
    • 当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。
    • 如果队列已经是满的,则创建一个新线程,
    • 如果线程数量已经达到maxPoolSize,则会执行拒绝策略
  2. LinkedBlockingQuene——基于链表的无界阻塞队列,按照FIFO排序
    • 其实最大容量为Interger.MAX
    • 由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,
    • 因此使用该工作队列时,参数maxPoolSize其实是不起作用的
  3. SynchronousQuene——不缓存任务的阻塞队列
    • 生产者放入一个任务必须等到消费者取出这个任务。
    • 也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,
    • 如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略
  4. PriorityBlockingQueue——具有优先级的无界阻塞队列
    • 优先级通过参数Comparator实现。

threadFactory

  • 创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

handler——拒绝策略

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,就用到拒绝策略 jdk中提供了4中拒绝策略

  1. CallerRunsPolicy——主线程自己执行该任务
    • 该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,否则直接抛弃任务。
  2. AbortPolicy——抛出异常
    • 该策略下,直接丢弃任务,并抛出RejectedExecutionException异常
  3. DiscardPolicy——直接丢弃
    • 该策略下,直接丢弃任务,什么都不做
  4. DiscardOldestPolicy——早删晚进
    • 该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列

公司微信公众号智能客服

原理层

知识库搭建:将文档上传对文档进行分片,在导入向量数据库RagFlow

大模型优先调用知识库,

具体实现层xml

function call:调用引入spring Ai

MCP

快递查询查询客服

spring ai接入接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Function(
        name = "getCurrentWeather", 
        description = "获取指定城市的当前天气",
        inputType = @Function.Parameter(
            type = "object",
            properties = {
                @Function.ParameterProperty(name = "location", type = "string", description = "城市名称")
            }
        )
    )
    public String getWeather(@RequestParam String location) {
        // 调用真实天气 API(示例)
        return "{\"location\": \"" + location + "\", \"temperature\": 25, \"condition\": \"晴\"}";
    }

抖音视频信息分析

解析链接,获取视频下载地址,消息发送给rocketMQ,发送给api语音转文字.任务完成时,发送给消费者,因为时json格式的文件,后端进行保存到OSS服务器中,返回url给用户,并保存到用户任务列中中,更新任务状物状态,用户可以调用文件再与aiagent进行分析

Mysql表优化

项目是在2023年启动的,项目前期是测试阶段数据量不足,而且当时各家大模型支持的上下文很不多平均就8k,到了2024年数据量攀升,而且大模型支持的token达到了128k.打开应用点击历史对话,从点击到响应历时2秒,后端通过接口测试工具大概平均是1.6秒,聊天轮次越多数据转json格式越慢.

1
2
3
4
5
[
    {"role": "user", "content": "What's the highest mountain in the world?"},
    {"role": "assistant", "content": "The highest mountain in the world is Mount Everest."},
    {"role": "user", "content": "What is the second?"}
]

新建一张表,引入一个chatId,一次只记录一行数据

后台服务通过,当时编写了一个xxl-job自动执行,一次执行500条数据,大概数据有100万,停机了三个小时.

新的查询逻辑,先从redis找,如果没有按照时间倒序排序,返回前面5轮对话,再将之前的存如redis当中

后端不需要进行json转换,时间降低到百毫秒级别

顺序存储使用List

开发AI应用的经验

pormpt提示词的优化,防止输出的上下文过大,过于繁琐,职责表述清晰

如果调用function call时desc需要描述清晰准确

本来调用了大模型,就增加了一段接口链路,所以功能需要尽量的简洁,尽量接口异步开发