Java技术栈

本文最后更新于:2024年6月21日 凌晨

SSE实现消息推送

SSE(Server-Sent Events)

当涉及到部分请求,后端处理时间较长,使用常规Http请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,

可选方案有WebSocketSSE。WebSocket可实现双工通信,SSE仅支持服务端向客户端推送消息,根据实际使用场景,SSE即可满足,故选用SSE。

SpringBoot中,先编写SSE服务SseEmitterServer

@Slf4j
@Service
public class SseEmitterServer {
	private final static Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
	
	/**
	 * 创建新连接
	 */
	public synchronized SseEmitter createSession() throws IOException {
		// 设置过期时间为0 表示永不过期
		SseEmitter sseEmitter = new SseEmitter(0L);
		String id = UUID.randomUUID().toString().replaceAll("-", "");
		if (!SSE_CACHE.containsKey(id)) {
			SSE_CACHE.put(id, sseEmitter);
			log.info("客户端:[{}]新建成功,当前客户端总数为:[{}]", id, SSE_CACHE.size());
		}
		return sseEmitter;
	}
	
	public void closeSession(String clientId){
		if (SSE_CACHE.containsKey(clientId)){
			SSE_CACHE.get(clientId).complete();
			SSE_CACHE.remove(clientId);
			log.info("客户端:[{}]关闭成功,当前剩余客户端总数为:[{}]", clientId, SSE_CACHE.size());
		}
	}
	
	/**
	 * 定时任务
	 */
	@Scheduled(fixedDelay = 3, initialDelay = 1)
	public void job() {
		if (SSE_CACHE.size() > 0){
			String msg = UUID.randomUUID().toString();
			System.out.println("定时任务发送消息:" + msg);
			for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
				SseEmitter sseEmitter = SSE_CACHE.get(entry.getKey());
				try {
					sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(msg));
				} catch (IOException e) {
					SSE_CACHE.remove(entry.getKey());
				}
			}
		}
	}
	
	public void sendMessageToClient(String clientId, String message) {
		SseEmitter sseEmitter = SSE_CACHE.get(clientId);
		if (sseEmitter != null) {
			try {
				sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(clientId).data(message));
			} catch (IOException e) {
				SSE_CACHE.remove(clientId);
			}
		}
	}
	
	public void sendMessageToAllClients(String message) {
		for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
			try {
				entry.getValue().send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(message));
			} catch (IOException e) {
				SSE_CACHE.remove(entry.getKey());
			}
		}
	}
}

接口使用:SseController

@RestController
@CrossOrigin
public class SseController {
	@Resource
	private SseEmitterServer sseEmitterServer;
	
	@GetMapping("/connect")
	public SseEmitter connect() throws IOException {
		return sseEmitterServer.createSession();
	}
	
	@GetMapping("/disconnect")
	public String disconnect(@RequestParam String clientId) {
		sseEmitterServer.closeSession(clientId);
		return "Disconnected";
	}
	
	@PostMapping("/send")
	public Result send(@RequestParam String msg) {
		sseEmitterServer.sendMessageToAllClients(msg);
		return Result.ok(null);
	}
}

在前端使用也非常的方便,下面是vue的一个小示例:

// 创建一个 EventSource 对象实例
let eventSource = null;
onMounted(() => {
    eventSource = new EventSource('http://localhost:9091/connect');
    eventSource.onmessage = (event) => {
        // 获取到后端返回的值
        message.value += String.fromCharCode(event.data);
    };
});

// 调用接口
await request.post("http://localhost:9091/send?msg=" + encodeURIComponent(text.value))

我们在后端提供两个接口,一个是前端进页面就进行SSE链接,另一个是调用接口,使用SSE返回给前端

初次进入页面时,进行连接

image-01

当发送消息调用接口,此处调用语言模型接口,以流式返回:

video-sse

ElasticSearch

ElasticSearch介绍

ElasticSearch概述

Elasticsearch (简称ES)是一个分布式、高扩展、高实时的、RESTful 风格的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用Elasticsearch的水平伸缩性,能使数据在生产环境变得更有价值。Elasticsearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elasticsearch 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户。

Elasticsearch是面向文档型数据库,一条数据在这里就是一个文档,用JSON作为文档序列化的格式,比如下面这条用户数据:

{
    "name" :     "John",
    "sex" :      "Male",
    "age" :      25,
    "birthDate": "1990/05/01",
    "about" :    "I love to go rock climbing",
    "interests": [ "sports", "music" ]
}

用Mysql这样的数据库存储就会容易想到建立一张User表,有各个字段等,在ElasticSearch里这就是一个文档,当然这个文档会属于一个User的类型,各种各样的类型存在于一个索引当中。这里有一份简易的将Elasticsearch和关系型数据术语对照表:

image-20230922141123048

索引(Index)

一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。

能搜索的数据必须索引,这样的好处是可以提高查询速度,比如:新华字典前面的目录就是索引的意思,目录可以提高查询速度。

Elasticsearch索引的精髓:一切设计都是为了提高搜索的性能。

类型(Type)

在一个索引中,你可以定义一种或多种类型。

一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。不同的版本,类型发生了不同的变化

版本 Type
5.x 支持多种type
6.x 只能有一种type
7.x 默认不再支持自定义索引类型(默认类型为:_doc)

文档(Document)

一个文档是一个可被索引的基础信息单元,也就是一条数据

比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个到处存在的互联网数据交互格式。

在一个index/type里面,你可以存储任意多的文档。

字段(Field)

相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。

映射(Mapping)

mapping是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理ES里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。

ElasticSearch安装

下载软件

下载地址:ElasticSearch

推荐下载7.8版本Elasticsearch 7.8.0

windows安装

解压文件,目录结构如下:

目录 说明
bin 可执行脚本目录
config 配置目录
jdk 内置jdk目录
lib 类库
logs 日志目录
modules 模块目录
plugins 插件目录

解压完成后进入bin目录,双击运行elasticsearch.bat

image-20230922144605883

测试访问: http://localhost:9200/

image-20230922144622450

注意事项一:

ElasticSearch是使用java开发的,且本版本的ES需要JDK版本要是1.8以上,所以安装ElasticSearch之前保证JDK1.8+安装完毕,并正确的配置好JDK环境变量,否则启动ElasticSearch失败。

注意事项二

出现闪退,通过路径访问发现“空间不足”

修改config/jvm.options文件的22行23行,把2改成1,让Elasticsearch启动的时候占用1个G的内存。

-Xmx512m:设置JVM最大可用内存为512M。

-Xms512m:设置JVM初始内存512m。此值可设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。

ik分词器安装

IK分词器简介

IKAnalyzer是一个开源的,基于Java语言开发的轻量级的中文分词工具包。从2006年12月推出1.0版开始,IKAnalyzer已经推出 了3个大版本。最初,它是以开源项目Lucene为应用主体的,结合词典分词和文法分析算法的中文分词组件。新版本的IKAnalyzer3.0则发展为面向Java的公用分词组件,独立于Lucene项目,同时提供了对Lucene的默认优化实现。

IK分词器3.0的特性如下:

1)采用了特有的“正向迭代最细粒度切分算法“,具有60万字/秒的高速处理能力。

2)采用了多子处理器分析模式,支持:英文字母(IP地址、Email、URL)、数字(日期,常用中文数量词,罗马数字,科学计数法),中文词汇(姓名、地名处理)等分词处理。

3)对中英联合支持不是很好,在这方面的处理比较麻烦.需再做一次查询,同时是支持个人词条的优化的词典存储,更小的内存占用。

4)支持用户词典扩展定义。

5)针对Lucene全文检索优化的查询分析器IKQueryParser;采用歧义分析算法优化查询关键字的搜索排列组合,能极大的提高Lucene检索的命中率。

IK分词器的安装

下载:

GitHub仓库地址:https://github.com/medcl/elasticsearch-analysis-ik

下载地址: https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.8.0/elasticsearch-analysis-ik-7.8.0.zip

解压安装IK插件:

ik压缩包直接解压到ElasticSearch的plugins\ik\目录下,注意目录结构,解压后的zip不要放在plugins目录下

image-20230925094542451

kibana安装-客户端

elasticsearch服务是一个restful风格的http服务。我们可以采用postman作为客户端来进行操作,elastic stack官方也给我们提供了kibana来进行客户端操作,这个相比postman要友好一点,因为里面有些自动补全的代码提示

下载地址: https://www.elastic.co/cn/downloads/past-releases/kibana-7-8-0

解压文件:进入到config目录,修改kibana.yml文件:

Kibana默认端口:5601

image-20230925102018009

Kibana连接elasticsearch服务器的地址:elasticsearch.hosts: [“http://localhost:9200"]

image-20230925102056677

修改kibana配置支持中文:i18n.locale: “zh-CN”

image-20230925102125367

运行访问

执行kibana-7.8.0\bin\kibana.bat

image-20230925102347281

开始访问:http://127.0.0.1:5601

RabbitMQ

RabbitMQ简介

以商品订单场景为例,

如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。

消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。

RabbitMQ就是这样一款消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。

典型应用场景:

  • 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。

image-20230925134928246

  • 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃

image-20230925134933915

安装rabbitMQ

#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management

abbitMQ服务后台

管理后台http://IP:15672 ,用户名和密码默认:guest

image-20230925141103100

在SpringBoot中引入,导入依赖:

<!--rabbitmq消息队列-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

配置application.yml

spring:
  rabbitmq:
    host: 192.168.10.100
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: CORRELATED  #发布确认模式,消息是否被成功发送到交换机
    publisher-returns: true
    listener:
      simple:
        prefetch: 1
        concurrency: 3
        acknowledge-mode: manual   #消费端手动确认

Java技术栈
https://junyyds.top/2023/09/22/Java技术栈/
作者
Phils
发布于
2023年9月22日
更新于
2024年6月21日
许可协议