Kafka 搭建日志系统

Kafka 搭建日志系统
MeteorCat在大部分情况下, 常规的 MySQL/PostgreSQL 就足够常规业务的 CURD 操作, 当业务扩展出来就开始有瓶颈.
最能体现这种情况就是 日志系统, 数据库当中的系统日志具有以下特征:
-
写入量极大且写入频繁: 查询比较少(就后台管理最多不超过100人), 但是写入量极大, 会出现单表超过50GB情况
-
查询条件复杂: 日志查询通常是按时间范围、关键字、级别、服务名等维度的组合查询, 细致一点还有针对某些属性查询
-
数据复用率低: 很多日志可能只需要查询半年或者90天数据, 其他时间段很少被查询到
-
数据结构可能比较灵活: 日志内容常包含 JSON、自由文本等非结构化数据(如接口请求参数、异常堆栈信息)
传统数据库虽然也能做此类数据保存, 但是在查询方面卡顿会非常严重, 并且 CPU 直接暴涨超过 100%.
也基于这种情况而需要外部其他工具来辅助, 也就是日常见到的传统分层处理架构:
-
MySQL/PostgreSQL(冷存储): 负责冷数据落地 -
Kafka(消息队列): 削峰填谷, 承接高并发日志写入, 同时作为热数据缓冲区 -
其他服务API: 负责接收客户端提交的日志数据, 投递到消息队列之中
Kafka 并不是作为数据库系统来使用, 而是以 JSON 格式记录日志文本记录好相关所需的核心元数据.
Kafka 原生查询能力弱, 可搭配 Kafka Streams 或 Flink 实现简单的实时过滤, 如果不想引入额外架构可以采用 Kafka Streams
本质上对于日志系统大部分情况只需要针对条件来做 区间/匹配/包含/不含 等功能即可, 都是相对比较简单的查询.
Kafka 的查询不再是称呼为 “查询”, 其实叫做 “过滤” 更加贴切
该方案适合中小规模日志场景(日写入量≤1000 万条,查询并发≤100QPS), 后续如果要扩大的大型日志场景(以日写入量过亿)就需要额外扩展.
强烈注意: 前期引入这部分方案不是个好选择, 盲目引入这种大数据架构会加大开发维护的复杂程度
基本上最开始都是写入到 MySQL/PostgreSQL 这类数据库, 就比如下面的 MySQL 用户行为日志上报表:
1 | # 应用事件上报表 |
这就是常规的客户端上报日志表, 这部分数据就是比较常规的日志表, 在量级比较小的情况通过 API 过滤异常数据之后直接写入数据库是没什么问题.
注意: Kafka 虽然作为数据队列, 但是提取之后的数据并不会直接销毁, 而是保存在本地不会去主动视为已使用去抛弃
后面就是规模上来时候可以把 Kafka 穿插到其中处理, 也就是需要其他服务器构建专门的 Kafka 消息队列服务, API 推送到 Kafka 队列.
Kafka 部署
注意: Kafka 服务会生成大量本地日志文件, 所以对于硬盘占用十分庞大, 必须要准备个大容量空间的扩展硬盘.
这里需要区分 Kafka 的版本, 因为在 2.8 之后不再集成内部 ZooKeeper 组件改由内部 KRaft 模式启动服务并且要求最低 Java17
官网手册: Kafka Doc
这里采用 Linux 的二进制处理即可, 需要手动配置环境处理:
1 | 创建 kafka 专门系统用户来管理(不允许登陆), 用于管理全部相关操作 |
/etc/systemd/system/kafka-server.service 系统单元如下:
1 | [Unit] |
最后运行启动即可:
1 | sudo systemctl daemon-reload # 刷新系统单元 |
至此 Kafka 服务就已经启动, 实际上可以理解为挂起消息队列服务而已; 而且目前启动会直接报错, 需要修改配置:
以下为 /data/kafka/config/server.properties 文件的具体配置和说明
1 | # 需要理解以下相关配置 |
修改完成之后重新初始化配置, 最好重新修复相关节点引导:
1 | 需要先关闭 systemd 服务重新生成 UUID |
如果没有问题即可, 后续就是准备创建订阅的消息频道的功能.
Topic 部署
上面的流程只是简单的挂起服务而已, 后面就是构建消息队列的具体功能, 首先就是创建频道(Topic)来监听推送过来的事件:
Topic 是 Kafka 中消息的分类/频道, 所有消息都必须发送到指定 Topic, 消费者也只能订阅 Topic 来消费消息
每个 Topic 会被分成多个分区(Partition), 分区是 Kafka 并行处理和数据存储的最小单位;
单机部署时副本数只能为 1, 集群部署可配置多副本提高可靠性.
这里先说下具体的相关命令:
1 | 确认目前的的 kafka 服务内部有多少 Topic |
如果出现丢失的 Topic 或者无法使用的情况, 可以排查下是否以下问题:
-
检测是否 log.dirs 丢失, 切忌不要将 log.dirs 目录丢在临时目录
-
cluster-id 不匹配导致元数据加载失败, 需要同步 cluster-id 一致
-
Kafka 服务未正常启动, 确实系统单元的 systemd 是否正确启动
如果没问题就可以通过查看列表确认消息队列已经完成, 现在就是准备构建 消费者(consumer) 和 生产者(producer) 数据链路.
生产和消费
-
生产者(producer): 将客户端上报的数据消息过滤清理掉非法部分, 然后放入 Topic 等待被消费
-
消费者(consumer): 创建数据通道监听对应 Topic, 有数据的时候提取这部分数据落地写入数据库
这里最简单的就是 API 接口接收到客户端数据过滤以下直接导入到 Topic 即可, 数据投递流程就如下顺序:
-
客户端上报数据
-
API接口接收到数据(清洗+生产消息)
-
API投递消息到 Kafka Topic
-
后台异步消费者任务接收到事件
-
后台异步消费者将数据同步落地到数据库/数仓
API 接收并清洗的部分可以用很多方式实现, 比如 PHP/Golang/Java 之类都支持 Kafka 直接投递, 这里不同语言都有实现不需要做说明.
这边已经先做好部分数据, 这部分测试数据可以先用着(注意: 文本文件导入必须每行存放一条 JSON 数据):
1 | {"action":1001,"appid":10001,"uid":100000,"sid":"srv_001","sname":"烈焰服务器","role_id":"role_100000_123","role_name":"玩家100000_龙","role_balance":"56890","role_level":"25","role_power":"56000","role_gender":1,"role_vip":"3","role_create_time":1735027200000,"role_level_up_time":1735030800000,"profession_id":"prof_002","profession_name":"战士","guild_id":"guild_101","guild_name":"天下第一盟","guild_master_id":"role_001","guild_master_name":"盟主大人","create_time":1735200000000,"create_ip":"192.168.1.100","extra":{"device":"iPhone 15","network":"5G","game_version":"1.2.0","channel":"官方"}} |
这里假设默认行为 action 如下:
-
1001: 注册
-
1002: 登录
-
1003: 升级
-
1004: 充值
-
1005: 退出
可以利用 kafka 客户端来手动写入 app-report 的 topic 之中:
1 | 可以直接手动复制粘贴插入数据 |
这里其实仅仅作为消息生产和消费的模式, 其实内部有很多实现都没有扩展出来说, 但是作为使用者我们只需要这就是消息队列的实现.
而对于 Kafka 是默认是不支持类似于 SQL 那样的聚合查询操作, 但是有时候能看到有的项目支持 SQL 语法.
这时因为内部扩展集成类似 KSQLDB/FlinkSQL/SparkSQL 等大数据处理工具, 将 Kafka 功能封装成类似 SQL 语法功能.
不过我们主要是采用 producer-consumer 模式, 其他入库同步等操作异步都是由脚本语言后台执行, 所以不关心在命令行执行查询的情况.
而现在已经实现这部分生产者投递, 后面只需要对应语言消费提取出来指定事件写入到 MySQL/PostgreSQL 即可, 这部分后续最好采用分页分表落地.
比如 app_report_{应用ID}(对应APPID) 分表或者 app_report_{20251203}(年月日分表), app_report_{202512}(年月分表) 等不同分表方式
入库这部分见仁见智, 需要按照不同情况来处理写入, 另外还有个额外的点, 那就是数据库表的唯一标识.
其实更加建议入库的数据表 primary key 设置为 UUID, 可以避免迁移或者写入的ID异常, 并且可以避免重复写入的问题:
1 | create table if not exists pino_app_report |
当然如果能实现雪花ID就更好, 也能够提升不少性能; 高版本 UUID 才优化好无序的问题, 采用有序序列方便递增优化
这部分的唯一ID也是在 API 上报的时候由服务端那边生成并顺带投递到 Kafka 之中, 而 UUID 特性保证基本很难出现重复相同情况.
主要大部分情况下 API 服务端都是大规模上报的情况, 这部分推送到 kafka 数据量很大, 必须要多个消费者任务来加快消费避免消息丢失抛弃;
而多个消费者任务也就代表必须要保证 id 不要受到争抢递增ID的占用影响, 所以必须要采用非递增而又能并行插入保证标识唯一.
Kafka 只是作为辅助手段, 主要就是保证数据库冷入库处理, 所以要尽可能安全加速数据库的落地, 避免消息滞留在 Kafka 而被触发删除规则.
数据落地 - Java
注意: Kafka 是作为消息队列方案, 所以要求执行落地的程序是常驻任务, 所以不适合定时调用处理
而需要常驻内存使用就算推荐采用匹配的原生 Java 方案, 这样就可以对技术栈做重合复用.
不过对于运行常驻的 Java 程序不要和 Kafka 在同一服务器之中, 否则高峰时期会发生资源抢占的问题
这里采用 Quarkus 做基础的容器管理和启动架构, Maven 的依赖组件如下(这里复用最开始的 Java17 版本):
1 |
|
内部已经集成 Quarkus 自带的 Kafka 和 ORM 组件, 只需要处理执行内部消费者任务即可, 首先追加 application.properties 配置:
1 | ## 通用配置 |
之后就是创建消费者的启动任务, 这里直接依托 Quarkus 设置启动任务来执行即可:
1 | import io.quarkus.runtime.Startup; |
这边入库代码我会简略编写, 不然代码占据太多篇幅观感也不好, 具体就是消费 Kafka 的消息入库而已
最后测试启动之后动态推入数据, 确认 PinoKafka 程序是否能够接收到消息队列推送的消息.




