来看头条

杨大大...大约 35 分钟

1.项目介绍

开发技术:Spring Cloud + Spring Boot + MybatisPlus + Redis + MySQL + Mongodb + Zookeeper + kafka + ElasticSearch + Docker + 第三方技术阿里云OSS;

项目背景:”来看头条“ 项目类似于今日头条,是一个新闻资讯类项目。该项目由用户端和自媒体端组成。在用户端,实现了用户通过app端登录功能、浏览文章功能、搜索文章功能、用户历史记录功能。在自媒体端,实现了自媒体管理员登录功能、发布文章功能、删除文章功能、上传素材功能、文章内容审核功能

项目重难点: 网关搭建;文章详情静态化及存储;文章自动审核及延迟发布;分布式锁解决集群下的方法抢占执行;热点文章实时计算

技术栈的具体应用

  • Spring-Cloud-Gateway : 微服务之前架设的网关服务,实现服务注册中的API请求路由,以及控制流速控制和熔断处理都是常用的架构手段,而这些功能Gateway天然支持
  • 运用Spring Boot快速开发框架,构建项目工程;并结合Spring Cloud全家桶技术,实现app后端、自媒体等微服务。
  • 运用Spring Cloud Alibaba Nacos作为项目中的注册中心和配置中心
  • 运用mybatis-plus作为持久层提升开发效率
  • 采用kafka作为消息服务中间件,把自媒体文章上下架放进消息队列;通过用户的行为(点赞、评论、喜欢)实时记录用户数据,通过kafkaStream流式计算最新的数据;与客户端系统消息通知
  • 运用Redis缓存技术,实现热数据的计算,提升系统性能指标,同时作为消息中间件异步消费任务。
  • 使用Mysql存储用户数据,以保证数据查询的高性能
  • 使用Mongodb存储用户历史记录数据,以保证用户热数据高扩展和高性能指标
  • 运用AI技术,来完成系统自动化功能,以提升效率及节省成本,比如文章审核

2.优化

2.1优化一

**缺陷 :**写操作(定时刷新)比较频繁的话导致 cache 中的数据会被频繁被删除,这样会影响缓存命中率 。

解决办法:

  • 数据库和缓存数据强一致场景 :更新 db 的时候同样更新 cache,不过我们需要加一个分布式锁来保证更新 cache 的时候不存在线程安全问题。

2.2优化二

**缺陷:**消费者丢失消息的情况

我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

解决办法:

  • 我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
  • 解决Kafka重复消费,开启kafka本身存在的幂等性。acks=allenable.idempotence=true来保证幂等性,这样 Producer 在重试发送消息时,Broker端就可以过滤重复消息。

2.3优化三

2.3.1存储技术选型:

功能阿里云ossminio
文件管理
文件管理权限策略
纠删码数据修复
持续备份
使用SDK管理资源
监控
版本控制开启版本控制后,针对文件的覆盖和删除操作将会以历史版本的形式保存下来,在错误覆盖或者删除文件后,能够将存储空间中存储的文件恢复至任意时刻的历史版本×
在指定时间内自动批量删除支持生命周期规则,您可以通过生命周期规则定期将非热门数据转换为低频访问、归档存储或冷归档存储,并删除过期数据×
提升数据上传、下载速率支持传输加速服务受限于服务器网卡带宽
高性能、高吞吐的数据访问服务缓存OSS中的热点文件,并提供高性能、高吞吐量的数据访问服务设置访问一个对象并下载下来进行缓存,那接下来的请求都会直接访问缓存磁盘上的对象,直至其过期失效
加密、保护敏感数据×
防护DDoS攻击当受保护的Bucket遭受大流量攻击时,OSS高防会将攻击流量牵引至高防集群进行清洗,并将正常访问流量回源到目标Bucket,确保业务的正常进行。×
记录资源的访问信息可以通过日志功能完成OSS的操作审计、访问统计、异常事件回溯和问题定位等工作×
控制流量在上传、下载、拷贝文件时进行流量控制,以保证您其他应用的网络带宽×
对数据进行分析和处理图片处理、视频截帧、智能媒体管理×
使用工具管理资源OSS支持图形化工具、命令行工具、文件挂载工具、FTP工具等方便您管理OSS资源支持图形化工具、命令行工、各类SDK
同城容灾、异地容灾×
数据合规保留允许用户以“不可删除、不可篡改”方式保存和使用数据。×

阿里云OSS是一个成熟的云服务,由阿里云提供技术支持和服务保障,使用方便,与其他阿里云服务集成度高。MinIO是开源软件,需要自行部署和维护。OSS有成熟的访问权限控制,安全性好。MinIO安全性取决于自行配置。

2.3.2数据库框架选型:

MyBatis和MyBatis-Plus都是Java语言中非常常用的ORM框架,二者有以下区别:

实现方式不同:

MyBatis是基于XML或注解方式进行数据库操作的持久化框架,它提供了简单的CRUD操作及动态SQL生成等功能。而MyBatis-Plus是在MyBatis框架上的封装,通过对于增强简化后的API更加方便地进行开发,并且在性能、效率和易用性上都有一定的优化。

编程风格有所差异:

MyBatis的编程风格更加传统,需要定义mapper.xml文件并根据传入的参数使用相应的SQL查询语句,需要实现 Mybatis 提供的各种方法;而MyBatis-Plus具有许多针对CRUD进行的简化方法,通过继承BaseMapper以及使用Lambda表达式,可以让我们像SpringDataJPA类似地使用接口编程方式进行数据库操作。

下面通过代码示例来演示一下MyBatis和MyBatis-Plus的区别:

  • MyBatis的实现

首先在Mybatis中需要配置sql mapper映射文件,如下所示:

<!-- userMapper.xml -->
<mapper namespace="com.demo.dao.UserMapper">

    <select id="getUserById" parameterType="java.lang.Integer" resultType="com.demo.entity.User">
        select * from t_user where id=#{id}
    </select>

</mapper>

然后通过Mybatis的SqlSession执行相关方法实现数据查询:

SqlSessionFactory sessionFactory = new SqlSessionFactoryBuilder().build(configuration);
SqlSession sqlSession = sessionFactory.openSession();
User user = sqlSession.selectOne("com.demo.dao.UserMapper.getUserById", 1);
sqlSession.close();
  • MyBatis-Plus的实现

在MyBatis-Plus中则要比上述方式简单得多,并且提供了更加方便的CRUD操作。例如我们可以定义一个接口继承BaseMapper接口并调用其中提供的方法来操作数据库,如下所示:

public interface UserMapper extends BaseMapper<User> {

}

以下是从数据库中根据ID查询数据时,可以在Service层直接调用BaseMapper中提供的selectById()方法:

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
    @Override
    public User getUserById(Long id) {
        return baseMapper.selectById(id);
    }
}

参考:https://blog.csdn.net/qq_51447496/article/details/131166800open in new window

2.3.3消息队列选型:

详细请参考:https://www.youngxy.top/page/Kafka/Kafka.htmlopen in new window

Kafka 和 RocketMQ 都支持 10w 级别的高吞吐量。

Kafka 一开始的目的就是用于日志收集和传输,适合有大量数据产生的互联网业务,特别是大数据领域的实时计算、日志采集等场景,用 Kafka 绝对没错,社区活跃度高,业内标准。

RocketMQ 特别适用于金融互联网领域这类对于可靠性要求很高的场景,比如订单交易等,而且 RocketMQ 是阿里出品的,经历过那么多次淘宝双十一的考验,大品牌,在稳定性值得信赖。但如果阿里不再维护这个技术了,社区有可能突然黄掉的风险。因此如果公司对自己的技术实力有自信,基础架构研发实力较强,推荐用 RocketMQ。

RabbitMQ 适用于公司对外提供能力,可能会有很多主题接入的中台业务场景,毕竟它是百万级主题数的。它的时效性是毫秒级的,但实际毫秒级和微秒级在感知上没有什么太大的区别,所以它的这一大优点并不太会作为考量标准。同时,它的功能是比较完善的,开源社区活跃度高,能解决开发中遇到的bug,所以万级别数据量业务场景的小公司可以优先选择功能完善的RabbitMQ。它的缺点就是用 Erlang 语言编写,所以很多开发人员很难去看懂源码并进行二次开发和维护,也就是说对于公司来说可能处于不可控的状态。

ActiveMQ 现在很少有人用,没怎么经过大规模吞吐量场景的考验,社区不怎么活跃,官方社区现在对 ActiveMQ 5.x 维护也越来越少,所以不推荐使用。

消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka
开发语言javaerlangjavascala
单机吞吐量万级万级10万级100万级
时效性msusmsms级以内
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
功能特性成熟的产品、较全的文档、各种协议支持好并发能力强、性能好、延迟低MQ功能比较完善,扩展性佳只支持主要的MQ功能,主要应用于大数据领域

消息中间件对比-选择建议

消息中间件建议
Kafka追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验
RabbitMQ性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

2.4优化四

缺陷:对于变量存在多线程并发竞争

解决办法:

为变量设置ThreadLocal。

2.5身份验证怎么做的?

AuthorizedFilter + AppJwtUtil
  1. 用户向服务器发送用户名、密码以及验证码用于登陆系统。用户进入网关开始登陆,网关过滤器进行判断,如果是登录,则路由到后台管理微服务进行登录。
  2. 如果用户用户名、密码以及验证码校验正确的话,服务端会返回已经签名的 Token,也就是 JWT。
  3. 用户以后每次向后端发请求都在 Header 中带上这个 JWT ,再次进入网关开始访问,网关过滤器接收用户携带的TOKEN。
  4. 服务端检查 JWT 并从中获取用户相关信息。网关过滤器解析TOKEN ,判断是否有权限,如果有,则放行,如果没有则返回未认证错误。

两点建议:

  1. 建议将 JWT 存放在 localStorage 中,放在 Cookie 中会有 CSRF 风险。
  2. 请求服务端并携带 JWT 的常见做法是将其放在 HTTP Header 的 Authorization 字段中(Authorization: Bearer Token)。

乐观锁:

使用版本号。

分布式id:

随着业务的增长,文章表可能要占用很大的物理存储空间,为了解决该问题,后期使用数据库分片技术。将一个数据库进行拆分,通过数据库中间件连接。如果数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。

文章端相关的表都使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content

mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法

第一:在实体类中的id上加入如下配置,指定类型为id_worker

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;

第二:在application.yml文件中配置数据中心id和机器id

mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.article.pojos
  global-config:
    datacenter-id: 1
    workerId: 1

2.6网关搭建:

思路分析:

  1. 用户进入网关开始登陆,网关过滤器进行判断,如果是登录,则路由到后台管理微服务进行登录
  2. 用户登录成功,后台管理微服务签发JWT TOKEN信息返回给用户
  3. 用户再次进入网关开始访问,网关过滤器接收用户携带的TOKEN
  4. 网关过滤器解析TOKEN ,判断是否有权限,如果有,则放行,如果没有则返回未认证错误

具体实现:

第一:

​ 在认证过滤器中需要用到jwt的解析,所以需要把工具类拷贝一份到网关微服务

第二:

​ 在网关微服务中新建全局过滤器

2.7文章详情静态化及存储:

文章详情静态化:

​ FreeMarker 是一款模板引擎: 即一种基于模板和要改变的数据, 并用来生成输出文本(HTML网页,电子邮件,配置文件,源代码等)的通用工具。 它不是面向最终用户的,而是一个Java类库,是一款程序员可以嵌入他们所开发产品的组件。

​ 模板编写为FreeMarker Template Language (FTL)。它是简单的,专用的语言, 不是像PHP那样成熟的编程语言。 那就意味着要准备数据在真实编程语言中来显示,比如数据库查询和业务运算, 之后模板显示已经准备好的数据。在模板中,你可以专注于如何展现数据, 而在模板之外可以专注于要展示什么数据。

存储:AliyunOSS

对象存储可提供更好的数据保护,加密、保护敏感数据。

2.8文章自动审核及延迟发布:

文章自动审核:

1 自媒体端发布文章后,开始审核文章(异步线程的方式审核文章,在自动审核的方法上加上@Async注解(标明要异步调用),在自媒体引导类中使用@EnableAsync注解开启异步调用)

2 审核的主要是审核文章的内容(文本内容和图片)

3 借助第三方提供的接口审核文本

4 借助第三方提供的接口审核图片,由于图片存储到OSS中,需要先下载才能审核

5 如果审核失败,则需要修改自媒体文章的状态,status:2 审核失败 status:3 转到人工审核

6 如果审核成功,则需要在文章微服务中创建app端需要的文章:

​ 在文章审核成功以后需要在app的article库中新增文章数据:

  • ​ 保存文章信息 ap_article

  • ​ 保存文章配置信息 ap_article_config

  • ​ 保存文章内容 ap_article_content

WmNewsAutoScanServiceImpl:

package com.xy.wemedia.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xy.common.aliyun.GreenImageScan;
import com.xy.common.aliyun.GreenTextScan;
import com.xy.common.aliyun.ImageScan;
import com.xy.common.aliyun.TextScan;
import com.xy.common.tess4j.Tess4jClient;
import com.xy.file.service.FileStorageService;
import com.xy.model.article.dtos.ArticleDto;
import com.xy.model.common.dtos.ResponseResult;
import com.xy.model.wemedia.pojos.WmChannel;
import com.xy.model.wemedia.pojos.WmNews;
import com.xy.model.wemedia.pojos.WmSensitive;
import com.xy.model.wemedia.pojos.WmUser;
import com.xy.utils.common.SensitiveWordUtil;
import com.xy.wemedia.mapper.WmChannelMapper;
import com.xy.wemedia.mapper.WmNewsMapper;
import com.xy.wemedia.mapper.WmSensitiveMapper;
import com.xy.wemedia.mapper.WmUserMapper;
import com.xy.wemedia.service.WmNewsAutoScanService;
import com.xy.apis.article.IArticleClient;
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.tess4j.TesseractException;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.stream.Collectors;

/**
 * @author 杨路恒
 */
@Service
//@Transactional
@Slf4j
public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
    @Autowired
    private WmNewsMapper wmNewsMapper;
    /**
     * 自媒体文章审核
     * @param id    自媒体文章id
     */
    @Override
    @Async  //标明当前方法是一个异步方法
    public void autoScanWmNews(Integer id) {
        //1.查询自媒体文章
        WmNews wmNews = wmNewsMapper.selectById(id);
        if (wmNews == null){
            throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
        }
        if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){
            //从内容中提取纯文本内容和图片
            Map<String,Object> textAndImages = handleTextAndImages(wmNews);
            //自管理的敏感词过滤
            boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
            if(!isSensitive){
                return;
            }
            //2.审核文本内容  阿里云接口
            boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews);
            if (!isTextScan){
                return;
            }
            //3.审核图片  阿里云接口
            boolean isImageScan =  handleImageScan((List<String>) textAndImages.get("images"),wmNews);
//            isImageScan = true;
            if (!isImageScan){
                return;
            }
            //4.审核成功,保存app端的相关的文章数据
            ResponseResult responseResult = saveAppArticle(wmNews);
            if(!responseResult.getCode().equals(200)){
                throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
            }
            //回填article_id
            wmNews.setArticleId((Long) responseResult.getData());
            updateWmNews(wmNews,(short) 9,"审核成功");
//            saveAppArticle(wmNews);
        }

    }

    @Autowired
    private WmSensitiveMapper wmSensitiveMapper;

    /**
     * 自管理的敏感词审核
     * @param content
     * @param wmNews
     * @return
     */
    private boolean handleSensitiveScan(String content, WmNews wmNews) {
        boolean flag =true;
        //获取所有的敏感词
        List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
        List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
        //初始化敏感词库
        SensitiveWordUtil.initMap(sensitiveList);
        //查看文章中是否包含敏感词
        Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
        if (map.size() > 0){
            updateWmNews(wmNews, (short) 2,"当前文章中存在违规内容" + map);
            flag = false;
        }
        return flag;
    }

    @Autowired
    private IArticleClient iArticleClient;
    @Autowired
    private WmChannelMapper wmChannelMapper;
    @Autowired
    private WmUserMapper wmUserMapper;

    /**
     * 修改文章内容
     * @param wmNews
     * @param i
     * @param reason
     */
    private void updateWmNews(WmNews wmNews, short i, String reason) {
        wmNews.setStatus(i);
        wmNews.setReason(reason);
        wmNewsMapper.updateById(wmNews);
    }

    /**
     * 保存app端相关的文章数据
     * @param wmNews
     * @return
     */
    private ResponseResult saveAppArticle(WmNews wmNews) {
        ArticleDto articleDto = new ArticleDto();
        //属性的拷贝
        BeanUtils.copyProperties(wmNews,articleDto);
        //文章的布局
        articleDto.setLayout(wmNews.getType());
        //频道
        WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
        if (wmChannel != null){
            articleDto.setChannelName(wmChannel.getName());
        }
        //作者
        articleDto.setAuthorId(wmNews.getUserId().longValue());
        WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
        if (wmUser != null){
            articleDto.setAuthorName(wmUser.getName());
        }
        //设置文章id
        if (wmNews.getArticleId() != null){
            articleDto.setId(wmNews.getArticleId());
        }
        articleDto.setCreatedTime(new Date());
        System.out.println(articleDto.getId());
        ResponseResult responseResult = iArticleClient.saveArticle(articleDto);
        return responseResult;
    }
    @Autowired
    private FileStorageService fileStorageService;
    @Autowired
    private GreenImageScan greenImageScan;

    @Autowired
    private Tess4jClient tess4jClient;
    @Autowired
    private ImageScan imageScan;
    /**
     * 审核图片
     * @param images
     * @param wmNews
     * @return
     */
    private boolean handleImageScan(List<String> images, WmNews wmNews) {
        boolean flag = true;
        if (images == null || images.size() == 0){
            return flag;
        }
        //下载图片 minIO
        //图片去重
        images = images.stream().distinct().collect(Collectors.toList());
//        List<byte[]> imageList = new ArrayList<byte[]>();
//        try {
//            for (String image : images) {
//                byte[] bytes = fileStorageService.downLoadFile(image);
//                ByteArrayInputStream in = new ByteArrayInputStream(bytes);
//                BufferedImage imageFile = ImageIO.read(in);
//                String result = tess4jClient.doOCR(imageFile);
//                boolean isSensitive = handleSensitiveScan(result, wmNews);
//                if (!isSensitive){
//                    return isSensitive;
//                }
//                imageList.add(bytes);
//            }
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        List<String> imageList = images;
        //审核图片
        try {
//            Map map = greenImageScan.imageScan(imageList);
            Map map = imageScan.imageScan(imageList);
            if (map != null){
                //审核失败
                if(map.get("suggestion").equals("block")){
                    flag = false;
                    updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }
                //不确定信息  需要人工审核
                else if (map.get("suggestion").equals("review")){
                    flag = false;
                    updateWmNews(wmNews, (short)3,"当前文章中存在不确定内容");
                }
            }
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }
        return flag;
    }

    @Autowired
    private GreenTextScan greenTextScan;
    @Autowired
    private TextScan textScan;

    /**
     * 审核纯文本内容
     * @param content
     * @param wmNews
     * @return
     */
    private boolean handleTextScan(String content, WmNews wmNews) {
        boolean flag = true;
        if ((wmNews.getTitle() + "-" + content).length() == 0){
            return flag;
        }
        try {
            Map map = textScan.greenTextScan(wmNews.getTitle() + "-" + content);
            if (map != null){
                //审核失败
                if(map.get("suggestion").equals("block")){
                    flag = false;
                    updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }
                //不确定信息  需要人工审核
                else if (map.get("suggestion").equals("review")){
                    flag = false;
                    updateWmNews(wmNews, (short)3,"当前文章中存在不确定内容");
                }
            }
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }

        return flag;
    }

    /**
     *  1.从自媒体文章的内容中提取文本和图片
     *  2.提取文章的封面图片
     * @param wmNews
     * @return
     */
    private Map<String, Object> handleTextAndImages(WmNews wmNews) {
        //存储纯文本内容
        StringBuilder stringBuilder = new StringBuilder();
        List<String> images = new ArrayList<>();
        //1.从自媒体文章的内容中提取文本和图片
        if (StringUtils.isNoneBlank(wmNews.getContent())){
            List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
            for (Map map : maps) {
                if (map.get("type").equals("text")){
                    stringBuilder.append(map.get("value"));
                }
                if (map.get("type").equals("images")){
                    images.add((String) map.get("value"));
                }
            }
        }
        //2.提取文章的封面图片
        if (StringUtils.isNoneBlank(wmNews.getImages())){
            String[] split = wmNews.getImages().split(",");
            images.addAll(Arrays.asList(split));
        }
        Map<String,Object> resultMap = new HashMap<>();
        resultMap.put("content",stringBuilder.toString());
        resultMap.put("images",images);
        return resultMap;
    }

}

在heima-leadnews-wemedia服务中已经依赖了heima-leadnews-feign-apis工程,只需要在自媒体的引导类中开启feign的远程调用即可

注解为:@EnableFeignClients(basePackages = "com.heima.apis") 需要指向apis这个包。

package com.xy.wemedia;

import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;


/**
 * @author 杨路恒
 */
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.xy.wemedia.mapper")
@EnableFeignClients(basePackages = "com.xy.apis")
@EnableAsync     //开启异步调用
@EnableScheduling   //开启调度任务
public class WemediaApplication {

    public static void main(String[] args) {
        SpringApplication.run(WemediaApplication.class,args);
    }

    /**
     * 分页拦截器
     * @return
     */
    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }
}

延迟发布:

有几个可能的原因项目中使用Redis而不是Kafka来实现延迟任务:

  1. 简单性。Redis相对于Kafka来说更简单,只需要一个Redis实例就可以实现延迟队列,而Kafka需要集群环境。Redis用起来更简单直接。
  2. 性能需求。如果项目对延迟任务的吞吐量要求不是很高,Redis完全可以满足需要。而Kafka更适合大吞吐量的场景。
  3. 资源消耗。Redis的资源消耗比较低,部署和运维成本较低。 Kafka的集群环境对资源要求较高。
  4. 已有Redis。如果项目已经使用了Redis,那么直接复用就可以实现延迟队列,无需引入额外的组件。
  5. 其他依赖。项目可能已经依赖了Redis的其他功能,比如缓存,消息等。那么直接复用Redis更加方便。

redis实现:zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序。

实现思路:

问题思路:

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

效率问题,算法的时间复杂度

3.在添加zset数据的时候,为什么不需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

实现:

  • 延迟队列服务提供对外接口:提供远程的feign接口
  • 发布文章集成添加延迟队列接口
  • 修改发布文章代码:把之前的异步调用修改为调用延迟任务
  • 消费任务进行审核文章

4.为什么选用redis作为消息队列?

把 Redis 当作队列来使用时,会面临的 2 个问题:

  • Redis 本身可能会丢数据;
  • 面对消息挤压,内存资源会紧张;

所以,能不能将 Redis 作为消息队列来使用,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

5.未来数据定时刷新

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞。

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

在TaskServiceImpl中添加方法:

/**
     * 未来数据定时刷新
     */
    @Scheduled(cron = "0 */1 * * * ?")
    @Override
    public void refresh() {
        String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
        if (StringUtils.isNotBlank(token)){
            log.info("未来数据定时刷新---定时任务");

            // 获取所有未来数据集合的key值
            Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
            for (String futureKey : futureKeys) {
                String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
                //获取该组key下当前需要消费的任务数据
                Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
                if (!tasks.isEmpty()) {
                    //将这些任务数据添加到消费者队列中
                    cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                    System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
                }
            }
        }
    }

使用refreshWithPipeline管道方法。

2.9分布式锁解决集群下的方法抢占执行:

问题描述:

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

分布式锁:

控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
  • 客户端A执行代码完成,删除锁
  • 客户端B在等待一段时间后再去请求设置key的值,设置成功
  • 客户端B执行代码完成,删除锁
/**
     * 加锁
     *
     * @param name
     * @param expire
     * @return
     */
    public String tryLock(String name, long expire) {
        name = name + "_lock";
        String token = UUID.randomUUID().toString();
        RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
        RedisConnection conn = factory.getConnection();
        try {

            //参考redis命令:
            //set key value [EX seconds] [PX milliseconds] [NX|XX]
            Boolean result = conn.set(
                    name.getBytes(),
                    token.getBytes(),
                    Expiration.from(expire, TimeUnit.MILLISECONDS),
                    RedisStringCommands.SetOption.SET_IF_ABSENT //NX
            );
            if (result != null && result) {
                return token;
            }
        } finally {
            RedisConnectionUtils.releaseConnection(conn, factory,false);
        }
        return null;
    }

2.10热点文章实时计算:

思路说明:

2.11服务降级处理

  • 服务降级是服务自我保护的一种方式,或者保护下游服务的一种方式,用于确保服务不会受请求突增影响变得不可用,确保服务不会崩溃

  • 服务降级虽然会导致请求失败,但是不会导致阻塞。

实现步骤:

①:在heima-leadnews-feign-api编写降级逻辑

/**
 * feign失败配置
 */
@Component
public class IArticleClientFallback implements IArticleClient {
    @Override
    public ResponseResult saveArticle(ArticleDto dto)  {
        return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
    }
}

在自媒体微服务中添加类,扫描降级代码类的包

@Configuration
@ComponentScan("com.heima.apis.article.fallback")
public class InitConfig {
}

②:远程接口中指向降级代码

@FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class)
public interface IArticleClient {

    @PostMapping("/api/v1/article/save")
    public ResponseResult saveArticle(@RequestBody ArticleDto dto);
}

③:客户端开启降级heima-leadnews-wemedia

在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务响应的超时的时间

feign:
  # 开启feign对hystrix熔断降级的支持
  hystrix:
    enabled: true
  # 修改调用超时时间
  client:
    config:
      default:
        connectTimeout: 2000
        readTimeout: 2000

openfeign的使用

IWemediaClient:

package com.xy.apis.wemedia;

import com.xy.model.common.dtos.ResponseResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

@FeignClient("leadnews-wemedia")
public interface IWemediaClient {

    @GetMapping("/api/v1/channel/list")
    public ResponseResult getChannels();
}

WemediaClient:

package com.xy.wemedia.feign;

import com.xy.apis.wemedia.IWemediaClient;
import com.xy.model.common.dtos.ResponseResult;
import com.xy.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class WemediaClient implements IWemediaClient {

    @Autowired
    private WmChannelService wmChannelService;

    @GetMapping("/api/v1/channel/list")
    @Override
    public ResponseResult getChannels() {
        return wmChannelService.findAll();
    }
}

IScheduleClient:

package com.xy.apis.schedule;

import com.xy.model.common.dtos.ResponseResult;
import com.xy.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

/**
 * @author 杨路恒
 */
@FeignClient("leadnews-schedule")
public interface IScheduleClient {

    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task);

    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/pull/{type}/{priority}")
    public ResponseResult pull(@PathVariable("type") int type,@PathVariable("priority")  int priority);
}

ScheduleClient:

package com.xy.schedule.feign;

import com.xy.model.common.dtos.ResponseResult;
import com.xy.model.schedule.dtos.Task;
import com.xy.schedule.service.TaskService;
import com.xy.apis.schedule.IScheduleClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @author 杨路恒
 */
@RestController
public class ScheduleClient implements IScheduleClient {
    @Autowired
    private TaskService taskService;
    /**
     * 添加任务
     * @param task   任务对象
     * @return
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task) {
        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 取消任务
     * @param taskId        任务id
     * @return
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/pull/{type}/{priority}")
    public ResponseResult pull(@PathVariable("type") int type,@PathVariable("priority")  int priority) {
        return ResponseResult.okResult(taskService.pull(type,priority));
    }
}

2.12素材列表查询

在自媒体引导类中天mybatis-plus的分页拦截器:

package com.xy.wemedia;

import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;


/**
 * @author 杨路恒
 */
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.xy.wemedia.mapper")
@EnableFeignClients(basePackages = "com.xy.apis")
@EnableAsync     //开启异步调用
@EnableScheduling   //开启调度任务
public class WemediaApplication {

    public static void main(String[] args) {
        SpringApplication.run(WemediaApplication.class,args);
    }

    /**
     * 分页拦截器
     * @return
     */
    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }
}

MyBatis Plus 是一个基于 MyBatis 的增强工具,它提供了很多方便的功能来简化数据库访问和操作。MyBatis Plus Interceptor 是 MyBatis Plus 的一个重要组件,用于在 MyBatis 执行 SQL 语句前后进行拦截和增强,实现各种功能,如分页查询、多租户支持、性能分析、乐观锁、逻辑删除等。

MyBatis Plus Interceptor 的原理如下:

  1. 拦截器接口:MyBatis Plus Interceptor 是一个接口,它定义了两个核心方法:interceptplugin
    • intercept 方法用于实现具体的拦截逻辑。在该方法中,你可以修改 SQL 语句、参数、执行结果等。
    • plugin 方法用于告诉 MyBatis Plus 该拦截器是否要拦截某个 Statement(SQL 语句执行对象)。
  2. 自定义拦截器:开发者可以自定义拦截器,实现 Interceptor 接口,然后将自定义的拦截器注册到 MyBatis Plus 中。
  3. 注册拦截器:在 MyBatis Plus 配置文件(通常是 XML 或 Java 配置类)中,你可以通过配置来注册自定义的拦截器。这样,当执行 SQL 语句时,拦截器会按照注册的顺序依次执行。
  4. 执行顺序:拦截器按照注册的顺序依次执行,首先执行第一个拦截器的 intercept 方法,然后将结果传递给下一个拦截器,直到所有拦截器都执行完毕。这允许多个拦截器协同工作,每个拦截器可以处理不同的任务。
  5. 功能增强:通过拦截器,你可以实现各种功能的增强,例如:
    • 分页查询:拦截查询 SQL,添加分页参数。
    • 多租户支持:拦截查询 SQL,添加租户信息过滤条件。
    • 性能分析:记录 SQL 执行时间等性能数据。
    • 乐观锁:在更新操作前检查版本号。
    • 逻辑删除:在删除操作前标记数据为已删除状态,而不是物理删除。

MyBatis Plus Interceptor 的原理是基于 Java 动态代理和反射机制实现的,它可以在 SQL 执行前后,对 SQL 语句和参数进行修改,或者对查询结果进行处理,从而实现各种功能的增强和定制。

MyBatis Plus 的分页插件是一个用于实现分页查询的重要组件,它基于 MyBatis 原生的 SQL 进行分页操作,而无需手动编写复杂的分页 SQL 语句。分页插件的原理如下:

  1. 分页查询条件构建:在代码中,你可以使用 MyBatis Plus 提供的 Page 类来构建分页查询条件。这包括指定要查询的页码、每页记录数以及其他查询条件,例如排序条件、查询条件等。
  2. 分页查询拦截:当执行一个带有分页条件的查询时,MyBatis Plus 的分页插件会拦截该查询请求。这是通过 MyBatis 的拦截器机制实现的。
  3. SQL 重写:分页插件会在拦截的时候对原始的 SQL 语句进行重写,以包含分页逻辑。这通常涉及到以下操作:
    • 将原始的查询语句包装在一个外部查询中,该外部查询会限制结果集的范围,以满足分页条件。
    • 添加 LIMITOFFSET 子句(具体取决于数据库的支持情况)来限制结果集的大小和偏移量。
    • 修改查询语句中的排序条件,以确保结果的顺序正确。
  4. SQL 参数处理:分页插件还会根据分页条件中的页码和每页记录数,计算出适当的偏移量和限制值,并将它们添加到 SQL 语句中。这确保了只返回分页查询条件指定的结果子集。
  5. 执行查询:经过 SQL 重写和参数处理后,MyBatis 执行修改后的 SQL 语句,从数据库中检索出分页结果集。
  6. 结果封装:查询结果被封装到一个 Page 对象中,该对象包括了查询的结果列表以及有关分页信息的元数据,如总记录数、总页数、当前页码等。
  7. 返回结果:分页查询的结果 Page 对象被返回给调用方,允许应用程序访问和操作分页数据。

总的来说,MyBatis Plus 分页插件的原理是在查询前拦截查询请求,重写 SQL 语句以包含分页逻辑,执行查询,将结果封装到一个 Page 对象中,最终返回给调用方。

2.13分布式id

随着业务的增长,文章表可能要占用很大的物理存储空间,为了解决该问题,后期使用数据库分片技术。将一个数据库进行拆分,通过数据库中间件连接。如果数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。

snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。

文章端相关的表都使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content

mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法

第一:在实体类中的id上加入如下配置,指定类型为id_worker

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;

第二:在application.yml文件中配置数据中心id和机器id

mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.article.pojos
  global-config:
    datacenter-id: 1
    workerId: 1

datacenter-id:数据中心id(取值范围:0-31)

workerId:机器id(取值范围:0-31)

2.14自媒体文章上下架

自媒体微服务发送消息,WmNewsServiceImpl:

public ResponseResult downOrUp(WmNewsDto wmNewsDto) {
        //1.检查参数
        if (wmNewsDto.getId() == null){
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }
        //2.查询文章
        WmNews wmNews = getById(wmNewsDto.getId());
        if (wmNews == null){
            return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
        }
        //3.判断文章是否已发布
        if (!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
        }
        //4.修改文章enable
        if (wmNewsDto != null && wmNewsDto.getEnable() > -1 && wmNewsDto.getEnable() < 2){
            update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,wmNewsDto.getEnable()).eq(WmNews::getId,wmNews.getId()));
            //发送消息,通知article端修改文章配置
            if (wmNews.getArticleId() != null){
                Map<String,Object> map = new HashMap<>();
                map.put("articleId",wmNews.getArticleId());
                map.put("enable",wmNews.getEnable());
                kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
            }
        }

        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }

在article端编写监听,接收数据, ArticleIsDownListener:

package com.xy.article.listener;

import com.alibaba.fastjson.JSON;
import com.xy.common.constants.WmNewsMessageConstants;
import com.xy.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author 杨路恒
 */
@Component
@Slf4j
public class ArticleIsDownListener {
    @Autowired
    private ApArticleConfigService apArticleConfigService;
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message) {
        if (StringUtils.isNoneBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
        }
    }
}

2.15es文章自动审核完成构建索引

文章微服务发送消息, ArticleFreemarkerServiceImpl:

private void createArticleESIndex(ApArticle apArticle, String content, String path) {
        SearchArticleVo vo = new SearchArticleVo();
        BeanUtils.copyProperties(apArticle,vo);
        vo.setContent(content);
        vo.setStaticUrl(path);

        kafkaTemplate.send(ArticleConstants.ARTICLE_ES_SYNC_TOPIC, JSON.toJSONString(vo));
    }

搜索微服务接收消息并创建索引, SyncArticleListener:

package com.xy.search.listener;

import com.alibaba.fastjson.JSON;
import com.xy.common.constants.ArticleConstants;
import com.xy.model.search.vos.SearchArticleVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 杨路恒
 */
@Component
@Slf4j
public class SyncArticleListener {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @KafkaListener(topics = ArticleConstants.ARTICLE_ES_SYNC_TOPIC)
    public void onMessage(String message){
        if(StringUtils.isNotBlank(message)){

            log.info("SyncArticleListener,message={}",message);

            SearchArticleVo searchArticleVo = JSON.parseObject(message, SearchArticleVo.class);
            IndexRequest indexRequest = new IndexRequest("app_info_article");
            indexRequest.id(searchArticleVo.getId().toString());
            indexRequest.source(message, XContentType.JSON);
            try {
                restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
                log.error("sync es error={}",e);
            }
        }

    }
}

2.16xxljob热点文章定时任务

实现思路:

频道列表远程接口准备:

计算完成新热数据后,需要给每个频道缓存一份数据,所以需要查询所有频道信息。

IWemediaClient:

package com.xy.article.service;

/**
 * @author 杨路恒
 */
public interface HotArticleService {
    /**
     * 计算热点文章
     */
    public void computeHotArticle();
}

WemediaClient:

package com.xy.wemedia.feign;

import com.xy.apis.wemedia.IWemediaClient;
import com.xy.model.common.dtos.ResponseResult;
import com.xy.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class WemediaClient implements IWemediaClient {

    @Autowired
    private WmChannelService wmChannelService;

    @GetMapping("/api/v1/channel/list")
    @Override
    public ResponseResult getChannels() {
        return wmChannelService.findAll();
    }
}

定义业务层接口,在article微服务中新建任务类:

ComputeHotArticleJob:

package com.xy.article.job;

import com.xy.article.service.HotArticleService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author 杨路恒
 */
@Component
@Slf4j
public class ComputeHotArticleJob {

    @Autowired
    private HotArticleService hotArticleService;

    @XxlJob("computeHotArticleJob")
    public void handle(){
        log.info("热文章分值计算调度任务开始执行...");
        hotArticleService.computeHotArticle();
        log.info("热文章分值计算调度任务结束...");

    }
}

HotArticleServiceImpl:

package com.xy.article.service.impl;

import com.alibaba.fastjson.JSON;
import com.xy.common.constants.ArticleConstants;
import com.xy.common.redis.CacheService;
import com.xy.model.article.pojos.ApArticle;
import com.xy.model.article.vos.HotArticleVo;
import com.xy.model.common.dtos.ResponseResult;
import com.xy.model.wemedia.pojos.WmChannel;
import com.xy.apis.wemedia.IWemediaClient;
import com.xy.article.mapper.ApArticleMapper;
import com.xy.article.service.HotArticleService;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author 杨路恒
 */
@Service
@Transactional
@Slf4j
public class HotArticleServiceImpl implements HotArticleService {
    @Autowired
    private ApArticleMapper apArticleMapper;
    /**
     * 计算热点文章
     */
    @Override
    public void computeHotArticle() {
        //1.查询前5天的文章数据
        Date date = DateTime.now().minusDays(50).toDate();
        List<ApArticle> articleListByLast5days = apArticleMapper.findArticleListByLast5days(date);
        //2.计算文章的分值
        List<HotArticleVo> hotArticleVoList = computeHotArticle(articleListByLast5days);
        //3.为每个频道缓存30条分值较高的文章
        cacheTagToRedis(hotArticleVoList);

    }
    @Autowired
    private IWemediaClient iWemediaClient;
    @Autowired
    private CacheService cacheService;

    /**
     * 为每个频道缓存30条分值较高的文章
     * @param hotArticleVoList
     */
    private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {
        //为每个频道缓存30条分值较高的文章
        ResponseResult responseResult = iWemediaClient.getChannels();
        if (responseResult.getCode().equals(200)){
            String jsonString = JSON.toJSONString(responseResult.getData());
            List<WmChannel> wmChannels = JSON.parseArray(jsonString, WmChannel.class);
            //检索出每个频道的文章
            if (wmChannels != null && wmChannels.size() > 0){
                for (WmChannel wmChannel : wmChannels) {
                    List<HotArticleVo> hotArticleVos = hotArticleVoList.stream().filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList());
                    //给文章进行排序,取30条分值较高的文章存入redis  key:频道id   value:30条分值较高的文章
                    sortAndCache(hotArticleVos, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId());
                }
            }
        }
        //设置推荐数据
        //给文章进行排序,取30条分值较高的文章存入redis  key:频道id   value:30条分值较高的文章
        sortAndCache(hotArticleVoList,ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
    }

    /**
     * 排序并且缓存数据
     * @param hotArticleVos
     * @param s
     */
    private void sortAndCache(List<HotArticleVo> hotArticleVos, String s) {
        hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        if (hotArticleVos.size() > 30) {
            hotArticleVos = hotArticleVos.subList(0,30);
        }
        cacheService.set(s,JSON.toJSONString(hotArticleVos));
    }

    /**
     * 计算文章分值
     * @param articleListByLast5days
     * @return
     */
    private List<HotArticleVo> computeHotArticle(List<ApArticle> articleListByLast5days) {
        List<HotArticleVo> hotArticleVoList = new ArrayList<>();
        if (articleListByLast5days != null && articleListByLast5days.size() > 0) {
            for (ApArticle apArticle : articleListByLast5days) {
                HotArticleVo hotArticleVo = new HotArticleVo();
                BeanUtils.copyProperties(apArticle,hotArticleVo);
                Integer score = computeScore(apArticle);
                hotArticleVo.setScore(score);
                hotArticleVoList.add(hotArticleVo);
            }
        }
        return hotArticleVoList;
    }

    /**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */
    private Integer computeScore(ApArticle apArticle) {
        Integer score = 0;
        if (apArticle.getLikes() != null){
            score += apArticle.getLikes() + ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
        }
        if (apArticle.getViews() != null){
            score += apArticle.getViews();
        }
        if (apArticle.getComment() != null){
            score += apArticle.getComment() + ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
        }
        if (apArticle.getCollection() != null){
            score += apArticle.getCollection() + ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
        }
        return score;
    }
}

待优化

使用FastDFS作为静态资源存储器,在其上实现热静态资源缓存、淘汰等功能(待优化)

运用Hbase技术,存储系统中的冷数据,保证系统数据的可靠性(待优化)

运用ES搜索技术,对冷数据、文章数据建立索引,以保证冷数据、文章查询性能(待优化)

当用户 Logout 的话,JWT 也还有效。除非,我们在后端增加额外的处理逻辑比如将失效的 JWT 存储起来,后端先验证 JWT 是否有效再进行处理。

3.QPS估算方法、性能测试:

QPS(Query Per Second):每秒请求数,就是说服务器在一秒的时间内处理了多少个请求。

怎么估出每秒钟能处理多少请求呢?

方式一:自己在接口里记录

这种方式指的是在你的接口里,日志记录了能体现该接口特性的,并具有唯一性的字符串!

例如,下面这一段代码:

@RestController 
@RequestMapping("/home") 
public class IndexController {
 //省略
 @RequestMapping("/index") 
 String index() { 
 logger.info("渣渣烟");
 return "index"; 
 } 
} 

假设现在我要统计index这个接口的QPS!

OK,什么叫能体现该接口特性的字符串呢!就像上面的"渣渣烟"这个字符串,只在index这个接口里出现过,没在其他其他接口里出现过!因此,只要统计出"渣渣烟"这个字符串在日志里的出现次数,就能知道该接口的请求次数!

什么叫具有唯一性的字符串呢!所谓唯一性,指的是"渣渣烟"这个字符串,在这个接口的一次调用流程中,只出现一次!如果出现两次,就会导致到时候统计出来的次数会多一倍,所以尽量选择具有唯一性的字段!

方式二:利用tomcat的access log

tomcat自带的access log功能:

server.tomcat.accesslog.directory
设定log的目录,默认: logs
server.tomcat.accesslog.enabled
是否开启access log,默认: false

此时,你访问一次/home/index地址,会有下面这样日志:

127.0.0.1 - - [xxx] "POST /home/index HTTP/1.1" 200 138

执行一串命令:

cat xx.log |grep 'GET /mvc2'|cut -d ' ' -f4|uniq -c|sort -n -r 

JMeter测试:

自我介绍:

面试官,您好,首先很感谢您给我的面试机会!我叫杨路恒,今年24岁,山东济宁人,就读于陕西师范大学,今年研二,软件工程专业,研究方向为知识图谱。大学时间我主要利用课外时间学习了 Java 以及 一些框架 。在校期间参与了全国大学生数学建模竞赛和全国大学生英语竞赛,并且在数学建模比赛中担任队长并获得了陕西省一等奖。说到业余爱好的话,一个是比较喜欢通过博客整理分享自己所学知识,现在在CSDN上的粉丝数达到了3k+,访问量达到了44W+。 另一个是喜欢旅游和骑行的方式来放松。这就是我的自我介绍,感谢。