LIVEKIT_URL=wss://kuntool-mgegyozf.livekit.cloud LIVEKIT_API_KEY=APIj76wPUGZzkrV LIVEKIT_API_SECRET=p5Uu8HIrT6Vo9o1EOnmeCzvef8b6RKwuf6i9VCxJNzYB
AiAgent定制服务系统
基于个性化定制+简单定制实现的,客户可以跟我们的项目经理沟通功能,由后端工程师和AiAgent工程师进行创建,有一些简易的功能(例如固定的pormpt)我们提供相应的接口
企业客户购买一套我们的小程序,技术支持,和运维人员去部署一套微信小程序,并且设置client_id
后台还能对页面配置进行修改,权限控制,root:进行创建账号,用户登录时返回可看见页面树状结构,
实时语音聊天模块 Realtime
当时我和组长负责这一个模块尝试,需要先跑通demo.我们大模型集成的是coze平台,当时我先查找了官方文档,并没有这个功能.
首先我们就先进行任务拆解,这个任务可以拆解成,前端获取语音对话流,发送给后端,后端将语音发送给语音转文字的ASR模型,在将生成的文字发送给大模型,大模型可以返回流式文字,再进行TTS语音合成.所有的功能呢都是由后端通过restful接口进行操作的,这其中的时间链路非常的长.我们经过测试前端发送一条10秒的语音,后端需要长达30秒才能进行响应,大模型语音打断也不灵敏,而且用户的语音数据都需要经过后端服务器,数据并发量大起来的话,对于带宽的要求也很高.所以就否决了这个流程
后面想着这些链路能不能直接在前端实现,由前端直接完成这一套的调用,后端就负责进行处理业务方面的校验.事实上这个是可行的,但是安全性难以保证.万一其他人逆向出前端存储的apikey,被盗刷这个损失就很大.后来了解到WebRTC技术,它是通过room的概念实现,将客户端加入room,连接webRTC服务器再将大模型一系列服务加入room,使他们实现相互通信,视频会议也是通过这个实现的.接下来我们确定好了技术实现,搭建了WebRTC服务器,使用的是liveKit这个开源项目,同时还能集成所需要的这些模型.
整体流程清楚了,前端先向后端发送一个createRoom请求,后端生成room_id,后端向livekit服务器发送一个请求生成token,和连接url 还有这写转文字接口密钥和其他一些信息 ,并记录下用户信息,如果在原来的对话基础上就是需要进行查数据库获取到原先的conversation记录,发送给大模型传输上下文.如果是新对话就需要create_conversation.还支持指定LLM模型,
前端通过token和url使用livekit的sdk就可以进行连接,
后端通过webhook就可以实现对用户状态的监听,加入房间,对话,对话打断,离开房间,livekit会发送webhook请求后端.然后就可以进行一些业务处理了.例如对话计费,
AI润色
面试官问:你在项目中是如何实现实时语音聊天的?
我参与了一个实时语音对话模块的开发,整体目标是实现用户通过语音与大模型实时交流的能力。
第一阶段:探索与问题分析
初期我和组长尝试了传统的串联方案:
- 前端采集语音流发给后端;
- 后端调用 ASR 语音识别模型,将语音转成文本;
- 然后将文本发给大模型进行处理;
- 最后再将大模型返回的文本转为语音(TTS)返回前端。
我们将这套流程跑通了,但在实测中发现几个关键问题:
- 整个链路延迟很高,10秒语音处理可能要30秒;
- 大模型语音打断不灵敏,体验差;
- 所有语音数据都要通过后端,带宽和并发压力大。
因此我们否决了这一方案。
第二阶段:方案重构与技术选型
为了降低延迟,我们考虑能否将语音链路前置到前端,只让后端处理业务逻辑。这个方案虽可行,但存在 API Key 暴露风险,安全性无法保障。
于是我们开始调研实时通信技术,最终选型了 WebRTC + LiveKit 方案:
- WebRTC 允许音视频低延迟传输;
- LiveKit 是一个开源的 WebRTC 框架,支持 room 概念,适合多人通信,也可以将模型服务作为 client 加入 room。
第三阶段:系统设计与实现流程
我们构建的系统整体流程如下:
- 前端发起
createRoom
请求; - 后端生成 roomId 和 LiveKit Token,请求 LiveKit Server,返回连接参数;
- 后端同时准备好需要的 ASR、TTS、大模型调用信息;
- 如果是延续对话,后端会从数据库查询 conversation 上下文传给大模型;
- 前端用 LiveKit SDK 和 Token 加入房间,实现语音流的传输;
- 模型服务以 Client 身份也加入 room,接收音频流、处理并返回结果;
- LiveKit Webhook 用于监听用户状态变化(加入、离开、对话中断),后端可基于此进行日志记录、计费等业务处理。
这个架构极大降低了延迟,同时也增强了可扩展性,体验上更接近实时对话。
为什么不直接用 WebSocket 做实时语音聊天,而选 WebRTC?
我们其实一开始也考虑过用 WebSocket 实现语音传输,但后来分析后发现 WebSocket 并不适合这种场景,原因有几点:
1. WebSocket 是基于 TCP 的文本/二进制通道,不适合低延迟音视频传输
WebSocket 更适合传输文本或结构化数据,比如实时文本聊天、游戏状态同步、协作编辑等。
但实时语音对话对延迟要求很高,一旦卡顿或抖动,体验就会断裂。而 WebSocket 缺乏音视频传输专用的网络优化机制,比如丢包容忍、带宽自适应、回声消除等。
2. WebRTC 是为音视频场景专门设计的,具备天生优势
- 使用 UDP 通信,支持低延迟、高实时性;
- 内置了音频编解码器、降噪、回声消除等功能;
- 支持点对点连接和多端参与(room),很适合我们这种用户和模型服务同时加入房间的架构。
3. WebRTC + LiveKit 更适合我们的服务架构
LiveKit 提供了完整的房间管理、Token 认证、媒体路由等功能,我们可以将大模型语音服务也作为 client 加入房间,实现音频流的自动流转,不需要手动写音频流分包、同步、重组等底层逻辑。
而如果用 WebSocket,我们需要自己处理音频采集、分包、时序控制、模型对接等大量底层细节,开发成本高、稳定性差。
小程序大模型对话
使用的是websocket连接,建立连接
大模型flux异步响应,用户端发送content,后端接收就请求大模型接口,响应,一个字一个字进行返回
并发量
设置了WebSocketSessionManager,用于管理连接,使用ConcurrentHashMap<String, Session>
进行存储用户user_id和用户session, 连接生命周期的绑定,连接开始调用onopen方法将user_id和seeion加到concurenhashmap中和,连接关闭onclose时同时移除,还可以设置心跳机制
- 小程序定时发送心跳包(如每 30 秒)
- 后端接收
ping
数据,刷新连接时间 - 定时清理超过 N 分钟无心跳的连接(释放资源)
1
2
3
4
5
6
7
8
9
@Scheduled(fixedRate = 60000)
public void cleanDeadSessions() {
for (Map.Entry<String, Session> entry : sessions.entrySet()) {
if (!entry.getValue().isOpen()) {
sessions.remove(entry.getKey());
}
}
}
限流策略,一个用户智能保持一个websocket连接,限制用户的发送消息的频率
引入spring提供的线程池ThreadPoolTaskExcutor底层封装的ThreadPoolExecutor,有ExecutorConfigurationSupport方法实现了InitailizingBean,当初始化代码的配置时就会调用这个方法从而实现自动创建ThreadPoolExecutor.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean(name ="commonTaskAsyncPool")
public ThreadPoolTaskExecutor commonTaskAsyncPool() {
logger.info("初始化commonTaskAsyncPool开始");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 核心线程数
executor.setMaxPoolSize(1000); // 最大线程数
executor.setQueueCapacity(10000); // 等待队列长度
executor.setKeepAliveSeconds(60); // 非核心线程空闲存活时间
executor.setThreadNamePrefix("Pool-CommonTask");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize(); // 初始化线程池
logger.info("初始化commonTaskAsyncPool结束");
return executor;
}
websocket的onMessage方法调用线程池进行响应
使用spring的而不使用JUC中的ThreadPoolExcutor是应为不受spring生命周期管理,task进行增强则有更多特性使用
corePoolSize——核心线程最大数
maximumPoolSize——线程池最大线程数
keepAliveTime——空闲线程存活时间。
- 当一个非核心线程被创建,使用完归还给线程池
- 一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由 keepAliveTime来设定
unit——空闲线程存活时间单位
- 这是keepAliveTime的计量单位
workQueue——等待队列
- 当线程池满了,线程就会放入这个队列中。任务调度再取出
jdk提供四种工作队列
- ArrayBlockingQueue——基于数组的阻塞队列,按FIFO,新任务放队尾
- 有界的数组可以防止资源耗尽问题。
- 当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。
- 如果队列已经是满的,则创建一个新线程,
- 如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
- LinkedBlockingQuene——基于链表的无界阻塞队列,按照FIFO排序
- 其实最大容量为Interger.MAX
- 由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,
- 因此使用该工作队列时,参数maxPoolSize其实是不起作用的
- SynchronousQuene——不缓存任务的阻塞队列
- 生产者放入一个任务必须等到消费者取出这个任务。
- 也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,
- 如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略
- PriorityBlockingQueue——具有优先级的无界阻塞队列
- 优先级通过参数Comparator实现。
threadFactory
- 创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
handler——拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,就用到拒绝策略 jdk中提供了4中拒绝策略
- CallerRunsPolicy——主线程自己执行该任务
- 该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,否则直接抛弃任务。
- AbortPolicy——抛出异常
- 该策略下,直接丢弃任务,并抛出RejectedExecutionException异常
- DiscardPolicy——直接丢弃
- 该策略下,直接丢弃任务,什么都不做
- DiscardOldestPolicy——早删晚进
- 该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
公司微信公众号智能客服
原理层
知识库搭建:将文档上传对文档进行分片,在导入向量数据库RagFlow
大模型优先调用知识库,
具体实现层xml
function call:调用引入spring Ai
MCP
快递查询查询客服
spring ai接入接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Function(
name = "getCurrentWeather",
description = "获取指定城市的当前天气",
inputType = @Function.Parameter(
type = "object",
properties = {
@Function.ParameterProperty(name = "location", type = "string", description = "城市名称")
}
)
)
public String getWeather(@RequestParam String location) {
// 调用真实天气 API(示例)
return "{\"location\": \"" + location + "\", \"temperature\": 25, \"condition\": \"晴\"}";
}
抖音视频信息分析
解析链接,获取视频下载地址,消息发送给rocketMQ,发送给api语音转文字.任务完成时,发送给消费者,因为时json格式的文件,后端进行保存到OSS服务器中,返回url给用户,并保存到用户任务列中中,更新任务状物状态,用户可以调用文件再与aiagent进行分析
Mysql表优化
项目是在2023年启动的,项目前期是测试阶段数据量不足,而且当时各家大模型支持的上下文很不多平均就8k,到了2024年数据量攀升,而且大模型支持的token达到了128k.打开应用点击历史对话,从点击到响应历时2秒,后端通过接口测试工具大概平均是1.6秒,聊天轮次越多数据转json格式越慢.
1
2
3
4
5
[
{"role": "user", "content": "What's the highest mountain in the world?"},
{"role": "assistant", "content": "The highest mountain in the world is Mount Everest."},
{"role": "user", "content": "What is the second?"}
]
新建一张表,引入一个chatId,一次只记录一行数据
后台服务通过,当时编写了一个xxl-job自动执行,一次执行500条数据,大概数据有100万,停机了三个小时.
新的查询逻辑,先从redis找,如果没有按照时间倒序排序,返回前面5轮对话,再将之前的存如redis当中
后端不需要进行json转换,时间降低到百毫秒级别
顺序存储使用List
开发AI应用的经验
pormpt提示词的优化,防止输出的上下文过大,过于繁琐,职责表述清晰
如果调用function call时desc需要描述清晰准确
本来调用了大模型,就增加了一段接口链路,所以功能需要尽量的简洁,尽量接口异步开发