Java技术栈
本文最后更新于:2024年6月21日 凌晨
SSE实现消息推送
SSE(Server-Sent Events)
当涉及到部分请求,后端处理时间较长,使用常规Http请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,
可选方案有WebSocket、SSE。WebSocket可实现双工通信,SSE仅支持服务端向客户端推送消息,根据实际使用场景,SSE即可满足,故选用SSE。
在SpringBoot
中,先编写SSE服务SseEmitterServer
@Slf4j
@Service
public class SseEmitterServer {
private final static Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
/**
* 创建新连接
*/
public synchronized SseEmitter createSession() throws IOException {
// 设置过期时间为0 表示永不过期
SseEmitter sseEmitter = new SseEmitter(0L);
String id = UUID.randomUUID().toString().replaceAll("-", "");
if (!SSE_CACHE.containsKey(id)) {
SSE_CACHE.put(id, sseEmitter);
log.info("客户端:[{}]新建成功,当前客户端总数为:[{}]", id, SSE_CACHE.size());
}
return sseEmitter;
}
public void closeSession(String clientId){
if (SSE_CACHE.containsKey(clientId)){
SSE_CACHE.get(clientId).complete();
SSE_CACHE.remove(clientId);
log.info("客户端:[{}]关闭成功,当前剩余客户端总数为:[{}]", clientId, SSE_CACHE.size());
}
}
/**
* 定时任务
*/
@Scheduled(fixedDelay = 3, initialDelay = 1)
public void job() {
if (SSE_CACHE.size() > 0){
String msg = UUID.randomUUID().toString();
System.out.println("定时任务发送消息:" + msg);
for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
SseEmitter sseEmitter = SSE_CACHE.get(entry.getKey());
try {
sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(msg));
} catch (IOException e) {
SSE_CACHE.remove(entry.getKey());
}
}
}
}
public void sendMessageToClient(String clientId, String message) {
SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
try {
sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(clientId).data(message));
} catch (IOException e) {
SSE_CACHE.remove(clientId);
}
}
}
public void sendMessageToAllClients(String message) {
for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
try {
entry.getValue().send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(message));
} catch (IOException e) {
SSE_CACHE.remove(entry.getKey());
}
}
}
}
接口使用:SseController
:
@RestController
@CrossOrigin
public class SseController {
@Resource
private SseEmitterServer sseEmitterServer;
@GetMapping("/connect")
public SseEmitter connect() throws IOException {
return sseEmitterServer.createSession();
}
@GetMapping("/disconnect")
public String disconnect(@RequestParam String clientId) {
sseEmitterServer.closeSession(clientId);
return "Disconnected";
}
@PostMapping("/send")
public Result send(@RequestParam String msg) {
sseEmitterServer.sendMessageToAllClients(msg);
return Result.ok(null);
}
}
在前端使用也非常的方便,下面是vue的一个小示例:
// 创建一个 EventSource 对象实例
let eventSource = null;
onMounted(() => {
eventSource = new EventSource('http://localhost:9091/connect');
eventSource.onmessage = (event) => {
// 获取到后端返回的值
message.value += String.fromCharCode(event.data);
};
});
// 调用接口
await request.post("http://localhost:9091/send?msg=" + encodeURIComponent(text.value))
我们在后端提供两个接口,一个是前端进页面就进行SSE链接,另一个是调用接口,使用SSE返回给前端
初次进入页面时,进行连接
当发送消息调用接口,此处调用语言模型接口,以流式返回:
ElasticSearch
ElasticSearch介绍
ElasticSearch概述
Elasticsearch (简称ES)是一个分布式、高扩展、高实时的、RESTful 风格的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用Elasticsearch的水平伸缩性,能使数据在生产环境变得更有价值。Elasticsearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elasticsearch 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户。
Elasticsearch是面向文档型数据库,一条数据在这里就是一个文档,用JSON作为文档序列化的格式,比如下面这条用户数据:
{
"name" : "John",
"sex" : "Male",
"age" : 25,
"birthDate": "1990/05/01",
"about" : "I love to go rock climbing",
"interests": [ "sports", "music" ]
}
用Mysql这样的数据库存储就会容易想到建立一张User表,有各个字段等,在ElasticSearch里这就是一个文档,当然这个文档会属于一个User的类型,各种各样的类型存在于一个索引当中。这里有一份简易的将Elasticsearch和关系型数据术语对照表:
索引(Index)
一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
能搜索的数据必须索引,这样的好处是可以提高查询速度,比如:新华字典前面的目录就是索引的意思,目录可以提高查询速度。
Elasticsearch索引的精髓:一切设计都是为了提高搜索的性能。
类型(Type)
在一个索引中,你可以定义一种或多种类型。
一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。不同的版本,类型发生了不同的变化
版本 | Type |
---|---|
5.x | 支持多种type |
6.x | 只能有一种type |
7.x | 默认不再支持自定义索引类型(默认类型为:_doc) |
文档(Document)
一个文档是一个可被索引的基础信息单元,也就是一条数据
比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个到处存在的互联网数据交互格式。
在一个index/type里面,你可以存储任意多的文档。
字段(Field)
相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。
映射(Mapping)
mapping是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理ES里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。
ElasticSearch安装
下载软件
下载地址:ElasticSearch
推荐下载7.8版本
:Elasticsearch 7.8.0
windows安装
解压文件,目录结构如下:
目录 | 说明 |
---|---|
bin | 可执行脚本目录 |
config | 配置目录 |
jdk | 内置jdk目录 |
lib | 类库 |
logs | 日志目录 |
modules | 模块目录 |
plugins | 插件目录 |
解压完成后进入bin目录,双击运行elasticsearch.bat
测试访问: http://localhost:9200/
注意事项一:
ElasticSearch是使用java开发的,且本版本的ES需要JDK版本要是1.8以上,所以安装ElasticSearch之前保证JDK1.8+安装完毕,并正确的配置好JDK环境变量,否则启动ElasticSearch失败。
注意事项二:
出现闪退,通过路径访问发现“空间不足”
修改config/jvm.options文件的22行23行,把2改成1,让Elasticsearch启动的时候占用1个G的内存。
-Xmx512m:设置JVM最大可用内存为512M。
-Xms512m:设置JVM初始内存512m。此值可设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。
ik分词器安装
IK分词器简介
IKAnalyzer是一个开源的,基于Java语言开发的轻量级的中文分词工具包。从2006年12月推出1.0版开始,IKAnalyzer已经推出 了3个大版本。最初,它是以开源项目Lucene为应用主体的,结合词典分词和文法分析算法的中文分词组件。新版本的IKAnalyzer3.0则发展为面向Java的公用分词组件,独立于Lucene项目,同时提供了对Lucene的默认优化实现。
IK分词器3.0的特性如下:
1)采用了特有的“正向迭代最细粒度切分算法“,具有60万字/秒的高速处理能力。
2)采用了多子处理器分析模式,支持:英文字母(IP地址、Email、URL)、数字(日期,常用中文数量词,罗马数字,科学计数法),中文词汇(姓名、地名处理)等分词处理。
3)对中英联合支持不是很好,在这方面的处理比较麻烦.需再做一次查询,同时是支持个人词条的优化的词典存储,更小的内存占用。
4)支持用户词典扩展定义。
5)针对Lucene全文检索优化的查询分析器IKQueryParser;采用歧义分析算法优化查询关键字的搜索排列组合,能极大的提高Lucene检索的命中率。
IK分词器的安装
下载:
GitHub仓库地址:https://github.com/medcl/elasticsearch-analysis-ik
解压安装IK插件:
将ik压缩包
直接解压到ElasticSearch的plugins\ik\目录下,注意目录结构,解压后的zip不要放在plugins目录下
kibana安装-客户端
elasticsearch服务是一个restful风格的http服务。我们可以采用postman作为客户端来进行操作,elastic stack官方也给我们提供了kibana来进行客户端操作,这个相比postman要友好一点,因为里面有些自动补全的代码提示
下载地址: https://www.elastic.co/cn/downloads/past-releases/kibana-7-8-0
解压文件:进入到config目录,修改kibana.yml文件:
Kibana默认端口:5601
Kibana连接elasticsearch服务器的地址:elasticsearch.hosts: [“http://localhost:9200"]
修改kibana配置支持中文:i18n.locale: “zh-CN”
运行访问
执行kibana-7.8.0\bin\kibana.bat
RabbitMQ
RabbitMQ简介
以商品订单场景为例,
如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ就是这样一款消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
典型应用场景:
- 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃
安装rabbitMQ
#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management
abbitMQ服务后台
管理后台:http://IP:15672 ,用户名和密码默认:guest
在SpringBoot中引入,导入依赖:
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
配置application.yml
:
spring:
rabbitmq:
host: 192.168.10.100
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #发布确认模式,消息是否被成功发送到交换机
publisher-returns: true
listener:
simple:
prefetch: 1
concurrency: 3
acknowledge-mode: manual #消费端手动确认