1、示例
来自官网的一段简单demo:
public String quickstart() {
// 开启了一个Transaction
Transaction t = Cat.newTransaction("URL", "pageName");
try {
// Event事件
Cat.logEvent("URL.Server", "serverIp", Event.SUCCESS, "ip=${serverIp}");
// Metric指标
Cat.logMetricForCount("metric.key");
Cat.logMetricForDuration("metric.key", 5);
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
Cat.logError(e);
} finally {
// 标记结束 complete()
t.complete();
}
return "hello";
}
2、报表
2.1、类型
常见的报表类型如下:
1、Transaction
监控一段代码运行情况。
运行次数、QPS、错误次数、失败率、响应时间统计(平均影响时间、Tp分位值)等等。
2、Event
监控一段代码运行次数。
例如记录程序中一个事件记录了多少次,错误了多少次。Event报表的整体结构与Transaction报表几乎一样,只缺少响应时间的统计。
3、Problem
记录整个项目在运行过程中出现的问题,包括一些异常、错误、访问较长的行为。Problem报表是由logview存在的特征整合而成,方便用户定位问题。 来源:
- 业务代码显示调用Cat.logError(e) API进行埋点,具体埋点说明可查看埋点文档。
- 与LOG框架集成,会捕获log日志中有异常堆栈的exception日志。
- long-url,表示Transaction打点URL的慢请求
- long-sql,表示Transaction打点SQL的慢请求
- long-service,表示Transaction打点Service或者PigeonService的慢请求
- long-call,表示Transaction打点Call或者PigeonCall的慢请求
- long-cache,表示Transaction打点Cache.开头的慢请求
- 业务代码显示调用Cat.logError(e) API进行埋点,具体埋点说明可查看埋点文档。
- 与LOG框架集成,会捕获log日志中有异常堆栈的exception日志。
- long-url,表示Transaction打点URL的慢请求
- long-sql,表示Transaction打点SQL的慢请求
- long-service,表示Transaction打点Service或者PigeonService的慢请求
- long-call,表示Transaction打点Call或者PigeonCall的慢请求
- long-cache,表示Transaction打点Cache.开头的慢请求
4、Heartbeat
表示程序内定期产生的统计信息, 如CPU利用率, 内存利用率, 连接池状态, 系统负载等
5、Metric
用于记录业务指标、指标可能包含对一个指标记录次数、记录平均值、记录总和,业务指标最低统计粒度为1分钟
2.2、图表
2.2.1、Transaction
点击show,展示以下内容:
点击URL,展示以下内容:
点击Log View,展示以下内容:
2.2.2、Event
2.2.3、Heartbeat
System Info
GC Info
JVMHeap Info
FrameworkThread Info
Disk Info
CatUsage Info
client-send-queue Info
2.2.4、Metric
2.2.5、State
3、客户端原理
3.1、架构图
来自官网的架构图如下:
Cat使用ThreadLocal保存消息上下文,在同一个线程里,产生的各个埋点会串联成一个消息树,树的每个节点都是一个消息(Message)。每个消息树都有一个唯一的messageId。当MessageTree标记complete时,通过MessageQueue异步的发送到服务端。
3.2、MessageId
每个日志,都会生成一个唯一的messageId,用户在查看日志时,通过messageId来查询到具体的日志。
格式
{domain}-{ip}-{hour}-{index} 示例:app_test-ac130001-463547-10030
MessageId分为四段:
- 第1段:应用名
- 第2段:当前机器的ip的16进制格式
- 第3段:系统当前时间除以小时得到的整点数
- 第4段:当前这个客户端在当前小时的顺序递增号
3.3、MessageTree
在同一个线程里,产生的各个埋点会串联成一个消息树,每个消息树都有一个唯一的messageId。
private ByteBuf m_buf;
// 服务名称
private String m_domain;
// 主机名
private String m_hostName;
// ip地址
private String m_ipAddress;
// transaction信息
private Message m_message;
// 消息id
private String m_messageId;
// 父消息id
private String m_parentMessageId;
// 根消息id
private String m_rootMessageId;
private String m_sessionToken;
// 线程组名称
private String m_threadGroupName;
// 线程id
private String m_threadId;
// 线程名称
private String m_threadName;
private boolean m_sample = true;
3.4、TcpSocketSender
客户端上报消息时,会先把消息放入队列中,然后通过sender线程从队列获取消息,异步发送给服务端。
3.4.1、初始化
public void initialize() {
// 创建一个ChannelManager,管理跟服务端通信的Channel
m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
// 开启TcpSocketSender任务线程
Threads.forGroup("cat").start(this);
// 开启ChannelManager任务线程
Threads.forGroup("cat").start(m_manager);
// 开启合并原子消息任务线程
Threads.forGroup("cat").start(new MergeAtomicTask());
}
3.4.2、创建ChannelManager
public ChannelManager(Logger logger,
List<InetSocketAddress> serverAddresses,
MessageQueue queue,
ClientConfigManager configManager,
MessageIdFactory idFactory) {
m_logger = logger;
m_queue = queue;
m_configManager = configManager;
m_idfactory = idFactory;
EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
m_bootstrap = bootstrap;
// @1
String serverConfig = loadServerConfig();
if (StringUtils.isNotEmpty(serverConfig)) {
List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
// @2
ChannelHolder holder = initChannel(configedAddresses, serverConfig);
if (holder != null) {
// @3
m_activeChannelHolder = holder;
} else {
m_activeChannelHolder = new ChannelHolder();
m_activeChannelHolder.setServerAddresses(configedAddresses);
}
} else {
ChannelHolder holder = initChannel(serverAddresses, null);
if (holder != null) {
m_activeChannelHolder = holder;
} else {
m_activeChannelHolder = new ChannelHolder();
m_activeChannelHolder.setServerAddresses(serverAddresses);
m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");
}
}
}
@1:从server拉取配置的路由列表。
@2:如果服务器端配置了路由列表,则用该路由列表初始化Channel。该方法从server返回的路由列表中选择第一个可用的ChannelHolder,然后赋值给m_activeChannelHolder。客户端上报数据时,会从ChannelManager中获取activeChannelHolder对象,然后上报数据。
@3:将获取到的有效的ChannelHolder赋值给m_activeChannelHolder,以后可以通过getActiveFuture方法获取ChannelFuture。
3.4.2、发送消息
public void run() {
m_active = true;
while (m_active) {
// @1
ChannelFuture channel = m_manager.channel();
if (channel != null && checkWritable(channel)) {
try {
// @2
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
}
...
}
}
private void sendInternal(MessageTree tree) {
// @1
ChannelFuture future = m_manager.channel();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
// @2
m_codec.encode(tree, buf);
int size = buf.readableBytes();
Channel channel = future.channel();
// @3
channel.writeAndFlush(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
@1:轮询检查channel的状态。
@2:当channel可写时,则从消息队列里面poll出一个消息树进行处理。
@3:从ChannelManager获取ChannelFuture对象。
@4:编码上报的消息。
@5:发送消息。
发送的数据,会经过格式化,对应如下:
PT1 app_test 172.19.0.1 172.19.0.1 main 30 http-nio-8088-exec-4 app_test-ac130001-463547-10030 null null nullt2022-11-18 19:36:57.628 URL pageNameE2022-11-18 19:36:57.628 URL.Server serverIp 0 ip=${serverIp}M2022-11-18 19:36:57.628 metric.key C 1M2022-11-18 19:36:57.628 metric.key T 5T2022-11-18 19:36:57.628 URL pageName 0 29us
4、服务端原理
4.1、架构图
来自官网的架构图如下:
4.2、初始化
- 完成Netty客户端的实例化,开启对2280端口的监听
- 选择网络通信方式,根据不同的操作系统做不同的设置(linux下使用epoll更加高效)
- 绑定消息编解码器
- 设置网络通信配置参数
public void init() {
try {
// 默认2280端口启动
startServer(m_port);
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
}
}
public synchronized void startServer(int port) throws InterruptedException {
// 根据不同的操作系统,做不同的设置
boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
int threads = 24;
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置netty的线程数量和channel通信方式
m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
// linux下使用epoll性能更好
m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
bootstrap.group(m_bossGroup, m_workerGroup);
bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加消息解码器
pipeline.addLast("decode", new MessageDecoder());
pipeline.addLast("encode", new ClientMessageEncoder());
}
});
// 网络通信配置
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
try {
m_future = bootstrap.bind(port).sync();
m_logger.info("start netty server!");
} catch (Exception e) {
m_logger.error("Started Netty Server Failed:" + port, e);
}
}
4.3、解码
对接收到的消息作解码,解析出消息树,然后转给处理器消费
public class MessageDecoder extends ByteToMessageDecoder {
private long m_processCount;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (buffer.readableBytes() < 4) {
return;
}
buffer.markReaderIndex();
// 读取消息的长度
int length = buffer.readInt();
buffer.resetReaderIndex();
if (buffer.readableBytes() < length + 4) {
return;
}
try {
if (length > 0) {
ByteBuf readBytes = buffer.readBytes(length + 4);
readBytes.markReaderIndex();
//readBytes.readInt();
// 对消息进行解码,解析出MessageTree
DefaultMessageTree tree = (DefaultMessageTree) CodecHandler.decode(readBytes);
// readBytes.retain();
readBytes.resetReaderIndex();
tree.setBuffer(readBytes);
// 消息处理器,对MessageTree进行消费
m_handler.handle(tree);
m_processCount++;
long flag = m_processCount % CatConstants.SUCCESS_COUNT;
if (flag == 0) {
m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
}
} else {
// client message is error
buffer.readBytes(length);
BufReleaseHelper.release(buffer);
}
} catch (Exception e) {
m_serverStateManager.addMessageTotalLoss(1);
m_logger.error(e.getMessage(), e);
}
}
}
public void handle(MessageTree tree) {
if (m_consumer == null) {
m_consumer = lookup(MessageConsumer.class);
}
try {
m_consumer.consume(tree);
} catch (Throwable e) {
m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
}
}
4.4、分发
解码出消息树后,会根据消息树的时间戳寻找对应的Period,Period对应的是一个整点的时间范围(如1:00-2:00),然后分发给所有类型的报表处理器做处理。
每个报表处理器里有一个PeriodTask集合,对domain hash计算(Math.abs(domain.hashCode()) % length),将消息放入一个队列中,异步消费消息,以此避免阻塞,提高消息的处理效率。
public void consume(MessageTree tree) {
long timestamp = tree.getMessage().getTimestamp();
// 每小时对应一个Period
Period period = m_periodManager.findPeriod(timestamp);
if (period != null) {
period.distribute(tree);
} else {
m_serverStateManager.addNetworkTimeError(1);
}
}
public void initialize() throws InitializationException {
// 创建Period管理器
m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
m_periodManager.init();
Threads.forGroup("cat").start(m_periodManager);
}
public void distribute(MessageTree tree) {
m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
boolean success = true;
String domain = tree.getDomain();
// 遍历所有类型的任务处理器
for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
List<PeriodTask> tasks = entry.getValue();
......
if (tasks.size()>1) {
// 计算放入哪个队列
index = Math.abs(domain.hashCode()) % length;
}
PeriodTask task = tasks.get(index);
// 消息放入队列,异步的处理消息
boolean enqueue = task.enqueue(tree);
......
}
......
}
其中m_tasks为各种类型的任务处理器的集合,结构如下:
// 消息入队
public boolean enqueue(MessageTree tree) {
......
// m_queue类型为ArrayBlockingQueue,size默认30000
boolean result = m_queue.offer(tree);
......
}
// 多线程异步的处理消息
public void run() {
try {
m_analyzer.analyze(m_queue);
} catch (Exception e) {
Cat.logError(e);
}
}
public void analyze(MessageQueue queue) {
while (!isTimeout() && isActive()) {
// 从队列中获取消息
MessageTree tree = queue.poll();
if (tree != null) {
try {
// 对消息进行消费,这是个抽象方法,由实现类完成具体的逻辑
process(tree);
} catch (Throwable e) {
m_errors++;
if (m_errors == 1 || m_errors % 10000 == 0) {
Cat.logError(e);
}
}
}
}
// 如果当前解析器超时,那么处理完对应队列内的消息后返回
while (true) {
MessageTree tree = queue.poll();
if (tree != null) {
try {
process(tree);
} catch (Throwable e) {
m_errors++;
if (m_errors == 1 || m_errors % 10000 == 0) {
Cat.logError(e);
}
}
} else {
break;
}
}
}
protected abstract void process(MessageTree tree);
4.5、分析
不同的分析器将从不同角度去分析统计上报的消息,汇总之后生成不同的报表。
cat内置的分析器有以下几种:
分析器分析上报的消息后,生成相应的报表存于Report对象中,报表由报表管理器(ReportManage)管理,报表在报表管理器中结构如下:
// 当前时间周期 --> domain --> Report对象
Map<Long, Map<String, T>> m_reports = new ConcurrentHashMap<Long, Map<String, T>>();
以TransactionAnalyzer为例,TransactionAnalyzer会统计消息的运行次数、QPS、错误次数、失败率、响应时间统计(平均影响时间、Tp分位值)等。统计结果存于TransactionReport中。
public void process(MessageTree tree) {
String domain = tree.getDomain();
// 根据时间周期、domain从报表管理器中获取报表
TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
List<Transaction> transactions = tree.findOrCreateTransactions();
for (Transaction t : transactions) {
String data = String.valueOf(t.getData());
// 对Transaction做处理
if (data.length() > 0 && data.charAt(0) == CatConstants.BATCH_FLAG) {
processBatchTransaction(tree, report, t, data);
} else {
processTransaction(report, tree, t);
}
}
if (System.currentTimeMillis() > m_nextClearTime) {
m_nextClearTime = m_nextClearTime + TimeHelper.ONE_MINUTE;
Threads.forGroup("cat").start(new Runnable() {
@Override
public void run() {
cleanUpReports();
}
});
}
}
private void processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
String type = t.getType();
String name = t.getName();
if (!m_filterConfigManager.discardTransaction(type, name)) {
boolean valid = checkForTruncatedMessage(tree, t);
if (valid) {
// 解析出ip
String ip = tree.getIpAddress();
TransactionType transactionType = findOrCreateType(report.findOrCreateMachine(ip), type);
TransactionName transactionName = findOrCreateName(transactionType, name, report.getDomain());
processTypeAndName(t, transactionType, transactionName, tree, t.getDurationInMillis());
}
}
}
private void processTypeAndName(Transaction t, TransactionType type, TransactionName name, MessageTree tree,double duration) {
String messageId = tree.getMessageId();
type.incTotalCount();
name.incTotalCount();
type.setSuccessMessageUrl(messageId);
name.setSuccessMessageUrl(messageId);
if (!t.isSuccess()) {
type.incFailCount();
name.incFailCount();
String statusCode = formatStatus(t.getStatus());
findOrCreateStatusCode(name, statusCode).incCount();
}
int allDuration = DurationComputer.computeDuration((int) duration);
double sum = duration * duration;
if (type.getMax() <= duration) {
type.setLongestMessageUrl(messageId);
}
if (name.getMax() <= duration) {
name.setLongestMessageUrl(messageId);
}
name.setMax(Math.max(name.getMax(), duration));
name.setMin(Math.min(name.getMin(), duration));
name.setSum(name.getSum() + duration);
name.setSum2(name.getSum2() + sum);
name.findOrCreateAllDuration(allDuration).incCount();
type.setMax(Math.max(type.getMax(), duration));
type.setMin(Math.min(type.getMin(), duration));
type.setSum(type.getSum() + duration);
type.setSum2(type.getSum2() + sum);
type.findOrCreateAllDuration(allDuration).incCount();
long current = t.getTimestamp() / 1000 / 60;
int min = (int) (current % (60));
boolean statistic = m_statisticManager.shouldStatistic(type.getId(), tree.getDomain());
processNameGraph(t, name, min, duration, statistic, allDuration);
processTypeRange(t, type, min, duration, statistic, allDuration);
}
private void processNameGraph(Transaction t, TransactionName name, int min, double d, boolean statistic,
int allDuration) {
int dk = formatDurationDistribute(d);
Duration duration = name.findOrCreateDuration(dk);
Range range = name.findOrCreateRange(min);
duration.incCount();
range.incCount();
if (!t.isSuccess()) {
range.incFails();
}
range.setSum(range.getSum() + d);
range.setMax(Math.max(range.getMax(), d));
range.setMin(Math.min(range.getMin(), d));
if (statistic) {
range.findOrCreateAllDuration(allDuration).incCount();
}
}
private void processTypeRange(Transaction t, TransactionType type, int min, double d, boolean statistic,
int allDuration) {
Range2 range = type.findOrCreateRange2(min);
if (!t.isSuccess()) {
range.incFails();
}
range.incCount();
range.setSum(range.getSum() + d);
range.setMax(Math.max(range.getMax(), d));
range.setMin(Math.min(range.getMin(), d));
if (statistic) {
range.findOrCreateAllDuration(allDuration).incCount();
}
}
当消息树数据如下时,观察各个阶段报表的变化:
PT1 cat devuser-pc 192.168.250.1 main 29 localhost-startStop-1 cat-c0a8fa01-463547-3538 null nullt2022-11-18 19:36:37.025 SQL config.findByNameE2022-11-18 19:36:37.391 SQL.Method SELECT 0 ["user-config"]E2022-11-18 19:36:37.391 SQL.Database jdbc:mysql://127.0.0.1:3306/cat?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&socketTimeout=120000 0T2022-11-18 19:36:38.034 SQL config.findByName 0 326665us SELECT c.id,c.name,c.content,c.creation_date,c.modify_date FROM config c WHERE c.name = ?
1、process方法刚开始时,TransactionReport数据如下:
<?xml version="1.0" encoding="utf-8"?>
<transaction-report domain="cat" startTime="2022-11-18 19:00:00" endTime="2022-11-18 19:59:59">
</transaction-report>
2、processNameGraph方法后,TransactionName数据如下:
<?xml version="1.0" encoding="utf-8"?>
<name id="config.findByName" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
<successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
<longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
<range value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
<duration value="512" count="1"/>
</name>
3、processTypeRange方法后,TransactionType数据如下:
<?xml version="1.0" encoding="utf-8"?>
<type id="SQL" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
<successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
<longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
<name id="config.findByName" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
<successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
<longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
<range value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
<duration value="512" count="1"/>
</name>
<range2 value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
</type>
4、process方法结束后,TransactionReport数据如下:
<?xml version="1.0" encoding="utf-8"?>
<transaction-report domain="cat" startTime="2022-11-18 19:00:00" endTime="2022-11-18 19:59:59">
<machine ip="192.168.250.1">
<type id="SQL" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
<successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
<longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
<name id="config.findByName" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
<successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
<longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
<range value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
<duration value="512" count="1"/>
</name>
<range2 value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
</type>
</machine>
</transaction-report>
4.6、存储
主要分成两大类:
- 原始数据的存储(MessageTree)
- 各种报表的存储(Transaction、Event等)
4.6.1、原始数据
原始数据的存储由DumpAnalyzer完成。保存到 [/data/appdatas/cat/bucket/dump/年月日/时] 目录下。日志进行磁盘存储时,每个应用会对应生成两种文件:数据文件(.dat)、索引文件(.idx)。
文件如下:
结构如下:
4.6.1.1、DumpAnalyzer
DumpAnalyzer对消息树解析,解析出messageId,通过messageId,可以获取这条日志的domain、ip等信息。然后交由MessageDumper做进一步处理:
public void process(MessageTree tree) {
try {
MessageId messageId = MessageId.parse(tree.getMessageId());
if (!shouldDiscard(messageId)) {
processWithStorage(tree, messageId, messageId.getHour());
}
} catch (Exception ignored) {
}
}
private void processWithStorage(MessageTree tree, MessageId messageId, int hour) {
MessageDumper dumper = m_dumperManager.find(hour);
tree.setFormatMessageId(messageId);
if (dumper != null) {
dumper.process(tree);
} else {
m_serverStateManager.addPigeonTimeError(1);
}
}
4.6.1.2、MessageDumper
MessageDumper的处理逻辑在process方法中,对ip hash取模计算,将消息放到对应的队列中。每个队列对应一个MessageProcessor(默认20个),从队列中取出消息异步消费。
private BlockDumperManager m_blockDumperManager;
private BucketManager m_bucketManager;
private ServerStatisticManager m_statisticManager;
private ServerConfigManager m_configManager;
// 缓存队列(默认20个)
private List<BlockingQueue<MessageTree>> m_queues = new ArrayList<BlockingQueue<MessageTree>>();
// 处理器列表(默认20个)
private List<MessageProcessor> m_processors = new ArrayList<MessageProcessor>();
private AtomicInteger m_failCount = new AtomicInteger(-1);
private Logger m_logger;
private long m_total;
// 默认20个线程
private int m_processThreads;
public void initialize(int hour) {
// 默认20个线程
int processThreads = m_configManager.getMessageProcessorThreads();
m_processThreads = processThreads;
for (int i = 0; i < processThreads; i++) {
BlockingQueue<MessageTree> queue = new ArrayBlockingQueue<MessageTree>(10000);
MessageProcessor processor = lookup(MessageProcessor.class);
m_queues.add(queue);
m_processors.add(processor);
processor.initialize(hour, i, queue);
Threads.forGroup("Cat").start(processor);
}
}
public void process(MessageTree tree) {
// 解析出MessageId,从id中可以获取这条日志的domain、ip等信息
MessageId id = tree.getFormatMessageId();
String domain = id.getDomain();
// 对ip的hash值取模计算,来决定消息放入哪个队列
// (Math.abs(id.getIpAddressInHex().hashCode())) % (m_processThreads)
int index = getIndex(id.getIpAddressInHex());
// queue类型为ArrayBlockingQueue,size为10000
BlockingQueue<MessageTree> queue = m_queues.get(index);
// 消息入队
boolean success = queue.offer(tree);
......
}
private int getIndex(String key) {
// m_processThreads默认为20
return (Math.abs(key.hashCode())) % (m_processThreads);
}
private void closeMessageProcessor() throws InterruptedException {
......
for (MessageProcessor processor : m_processors) {
processor.shutdown();
super.release(processor);
}
}
4.6.1.3、MessageProcessor
DefaultMessageProcessor对每条消息进行处理,首先从Map<domain,Block>找到domain对应的Block,然后将该息,通过snappy算法进行压缩,追加到Block中的内存块中,并且将该息的MessageId和在Block中的偏移量放入Map<MessageId,offset>中。这样,通过MessageId,和对应的Block就可以查到该日志。(日志在追加到Block时,会在最开始4个字节写入日志的长度,这样通过日志的偏移量就可以定位出日志了)。将日志追加到Block中的内存块中,当Block写满256K时,会生成一个新的Block,原先的Block会放到DefaultBlcokDumper进行写入磁盘中。
DefaultMessageProcessor代码如下:
private BlockDumperManager m_blockDumperManager;
private MessageFinderManager m_finderManager;
private ServerConfigManager m_configManger;
private BlockDumper m_dumper;
// 当前MessageProcessor的编号
private int m_index;
// new ArrayBlockingQueue<MessageTree>(10000)
private BlockingQueue<MessageTree> m_queue;
// key为应用名称,Block为一段内存块,用于存放日志
private ConcurrentHashMap<String, Block> m_blocks = new ConcurrentHashMap<String, Block>();
private int m_hour;
private AtomicBoolean m_enabled;
private CountDownLatch m_latch;
private int m_count;
public void run() {
MessageTree tree;
try {
while (m_enabled.get() || !m_queue.isEmpty()) {
tree = pollMessage();
if (tree != null) {
processMessage(tree);
}
}
} catch (InterruptedException e) {
}
for (Block block : m_blocks.values()) {
try {
block.finish();
m_dumper.dump(block);
} catch (IOException e) {
// ignore it
}
}
m_blocks.clear();
m_latch.countDown();
}
private void processMessage(MessageTree tree) {
MessageId id = tree.getFormatMessageId();
String domain = id.getDomain();
int hour = id.getHour();
// 获取应用对应的block
Block block = m_blocks.get(domain);
// block不存在,则创建一个block
if (block == null) {
block = new DefaultBlock(domain, hour);
m_blocks.put(domain, block);
}
ByteBuf buffer = tree.getBuffer();
try {
// 当Block写满256K时
if (block.isFull()) {
block.finish();
// Block放到DefaultBlcokDumper中写入磁盘
m_dumper.dump(block);
// 生成一个新的Block
block = new DefaultBlock(domain, hour);
m_blocks.put(domain, block);
}
// 若Block未满,则将日志追加到Block中的内存块中
block.pack(id, buffer);
} catch (Exception e) {
Cat.logError(e);
} finally {
ReferenceCountUtil.release(buffer);
}
}
DefaultBlock代码如下:
// block大小为256K
private static final int MAX_SIZE = 256 * 1024;
private String m_domain;
private int m_hour;
// block中存储的数据
private ByteBuf m_data;
private int m_offset;
// 用于记录每个日志在block中的偏移量
private Map<MessageId, Integer> m_offsets = new LinkedHashMap<MessageId, Integer>();
// SnappyOutputStream类型
private volatile OutputStream m_out;
private volatile boolean m_isFlush;
public void pack(MessageId id, ByteBuf buf) throws IOException {
synchronized (m_out) {
int len = buf.readableBytes();
buf.readBytes(m_out, len);
// 用于记录每个日志放到Data中的偏移量
m_offsets.put(id, m_offset);
m_offset += len;
}
}
SnappyOutputStream 是 Google 的 Snappy 压缩库的一部分,它提供了非常高效的压缩算法。通过使用 SnappyOutputStream,您可以将数据写入输出流,并在写入的同时进行压缩,从而减小数据的体积,节省存储空间和网络带宽。这对于处理大量数据或需要在网络上传输大量数据的场景非常有用。
4.6.1.4、BlockDumper
BlockDumper通过 domain进行hash,将Block放入到DefaultBlockWriter进行磁盘写入。
private ServerStatisticManager m_statisticManager;
private ServerConfigManager m_configManager;
private List<BlockingQueue<Block>> m_queues = new ArrayList<BlockingQueue<Block>>();
private List<BlockWriter> m_writers = new ArrayList<BlockWriter>();
private int m_failCount = -1;
private Logger m_logger;
public void initialize(int hour) {
// 默认线程数为5
int threads = m_configManager.getMessageDumpThreads();
for (int i = 0; i < threads; i++) {
BlockingQueue<Block> queue = new ArrayBlockingQueue<Block>(10000);
BlockWriter writer = lookup(BlockWriter.class);
m_queues.add(queue);
m_writers.add(writer);
writer.initialize(hour, i, queue);
Threads.forGroup("Cat").start(writer);
}
}
public void dump(Block block) throws IOException {
String domain = block.getDomain();
int hash = Math.abs(domain.hashCode());
int index = hash % m_writers.size();
BlockingQueue<Block> queue = m_queues.get(index);
boolean success = queue.offer(block);
if (!success) {
m_statisticManager.addBlockLoss(1);
if ((++m_failCount % 100) == 0) {
Cat.logError(new BlockQueueFullException("Error when adding block to queue, fails: " + m_failCount));
m_logger.info("block dump queue is full " + m_failCount + " index:" + index);
}
} else {
m_statisticManager.addBlockTotal(1);
}
}
4.6.1.5、BlockWriter
BlockWriter用于将每个Block写入磁盘,他会从LocalBucketManager获取到对应的LocalBucket,通过LocalBucket,将Block数据写入磁盘中。
private BucketManager m_bucketManager;
private ServerStatisticManager m_statisticManager;
private int m_index;
private BlockingQueue<Block> m_queue;
private long m_hour;
private int m_count;
private AtomicBoolean m_enabled;
private CountDownLatch m_latch;
public void initialize(int hour, int index, BlockingQueue<Block> queue) {
m_hour = hour;
m_index = index;
m_queue = queue;
m_enabled = new AtomicBoolean(true);
m_latch = new CountDownLatch(1);
}
public void run() {
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
try {
while (m_enabled.get() || !m_queue.isEmpty()) {
Block block = m_queue.poll(5, TimeUnit.MILLISECONDS);
if (block != null) {
long time = System.currentTimeMillis();
processBlock(ip, block);
long duration = System.currentTimeMillis() - time;
m_statisticManager.addBlockTime(duration);
}
}
} catch (InterruptedException e) {
// ignore it
}
m_latch.countDown();
}
private void processBlock(String ip, Block block) {
try {
Bucket bucket = m_bucketManager.getBucket(block.getDomain(), ip, block.getHour(), true);
boolean monitor = (++m_count) % 1000 == 0;
if (monitor) {
Transaction t = Cat.newTransaction("Block", block.getDomain());
try {
bucket.puts(block.getData(), block.getOffsets());
} catch (Exception e) {
Cat.logError(ip, e);
t.setStatus(Transaction.SUCCESS);
}
t.setStatus(Transaction.SUCCESS);
t.complete();
} else {
try {
bucket.puts(block.getData(), block.getOffsets());
} catch (Exception e) {
Cat.logError(ip, e);
}
}
} catch (Exception e) {
Cat.logError(ip, e);
} catch (Error e) {
Cat.logError(ip, e);
} finally {
block.clear();
}
}
4.6.1.6、LocalBucket
每个应用每小时都对应一个LocakBucket,日志进行磁盘存储时,每个应用每小时会生成两个文件,一个数据文件(.dat)、一个索引文件(.idx)。
public synchronized void puts(ByteBuf data, Map<MessageId, Integer> mappings) throws IOException {
long dataOffset = m_data.getDataOffset();
// 数据写入数据文件(.dat)中
m_data.write(data);
for (Map.Entry<MessageId, Integer> e : mappings.entrySet()) {
MessageId id = e.getKey();
int offset = e.getValue();
// 索引写入索引文件(.idx)中
m_index.write(id, dataOffset, offset);
}
}
4.6.1.6.1、DataHelper
用于数据文件(.dat)的读写,数据文件的存储比较简单,就是把Block中的Data数据append到文件中。
private void write(ByteBuf data) throws IOException {
int len = data.readableBytes();
m_out.writeInt(len);
data.readBytes(m_out, len);
m_offset += len + 4;
}
4.6.1.6.2、IndexHelper
IndexHelper用于索引文件(.idx)的读写,会记录每个MessageId对应的日志在数据文件的偏移量,这样通过索引文件,就可以查到对应的日志。
结构如下:
一级索引:
- 由 4096 个 8byte 构成
- 每个索引项由64位存储,前 32 位为 IP,后 32 位为 index/4k
- 第一个 8byte 存储可存储魔数(图中用 -1 表示),用于标识文件有效性
- 一级索引剩余 4095 个 8byte 分别与二级索引中每个 segment 顺序一一对应
二级索引:
- 由 4095 个 segment 构成,每个 segment 由 4096 个 8byte 构成
- 每个索引项由64位存储,前 40 位为存储块的首地址,后 24 位为解压后的块内偏移地址
private void write(MessageId id, long blockAddress, int blockOffset) throws IOException {
long position = m_header.getOffset(id.getIpAddressValue(), id.getIndex(), true);
long address = position / SEGMENT_SIZE;
int offset = (int) (position % SEGMENT_SIZE);
// 获取segment
Segment segment = getSegment(id.getIpAddressInHex(), address);
long value = (blockAddress << 24) + blockOffset;
if (segment != null) {
// 二级索引,每个索引项前40位存储块的首地址,后24位为解压后的块内偏移地址
segment.writeLong(offset, value);
} else {
if (m_count.incrementAndGet() % 1000 == 0) {
Cat.logEvent("AbnormalBlock", id.getDomain());
}
if (m_nioEnabled) {
m_indexChannel.position(position);
ByteBuffer buf = ByteBuffer.allocate(8);
buf.putLong(value);
buf.flip();
m_indexChannel.write(buf);
} else {
m_file.seek(position);
m_file.writeLong(value);
}
}
}
4.6.2、报表数据
每种类型的报表会生成两个文件:数据文件、索引文件。
目录
保存在 [/data/appdatas/cat/bucket/report/年月日/时/index] 目录下:
数据文件
存储该分析器下所有的报表数据。如report-transaction文件内容如下:
索引文件
存储数据文件内报表的位置索引。比如 report-transaction.idx文件内容如下:
源码
以TransactionAnalyzer为例,TransactionAnalyzer的结束是在doCheckpoint中完成的
public synchronized void doCheckpoint(boolean atEnd) {
if (atEnd && !isLocalMode()) {
m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
} else {
m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
}
}
public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
......
// 取出报表
Map<String, T> reports = m_reports.get(startTime);
ReportBucket bucket = m_bucketManager.getReportBucket(startTime, m_name, index);
// 保存前的预处理
m_reportDelegate.beforeSave(reports);
try {
// 保存
storeFile(reports, bucket);
} finally {
m_bucketManager.closeBucket(bucket);
}
......
}
private void storeFile(Map<String, T> reports, ReportBucket bucket) {
for (T report : reports.values()) {
try {
// 获取domain
String domain = m_reportDelegate.getDomain(report);
// 报表内容转化为xml数据
String xml = m_reportDelegate.buildXml(report);
// 报表存储
bucket.storeById(domain, xml);
} catch (Exception e) {
Cat.logError(e);
}
}
}
public ReportBucket getReportBucket(long timestamp, String name, int index) throws IOException {
Date date = new Date(timestamp);
ReportBucket bucket = lookup(ReportBucket.class);
bucket.initialize(name, date, index);
return bucket;
}
public void initialize(String name, Date timestamp, int index) throws IOException {
// 报表存储目录
m_baseDir = Cat.getCatHome() + "bucket/report";
m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock();
// 年月日/时/index/report-transaction(20220825/19/0/report-transaction)
String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index);
// 数据文件
File dataFile = new File(m_baseDir, logicalPath);
// 索引文件
File indexFile = new File(m_baseDir, logicalPath + ".idx");
if (indexFile.exists()) {
loadIndexes(indexFile);
}
final File dir = dataFile.getParentFile();
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException(String.format("Fail to create directory(%s)!", dir));
}
m_logicalPath = logicalPath;
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
m_writeDataFileLength = dataFile.length();
m_readDataFile = new RandomAccessFile(dataFile, "r");
}
public boolean storeById(String id, String report) throws IOException {
// 报表内容
byte[] content = report.getBytes("utf-8");
// 报表长度
int length = content.length;
byte[] num = String.valueOf(length).getBytes("utf-8");
m_writeLock.lock();
try {
// 报表数据存储
m_writeDataFile.write(num);
m_writeDataFile.write('\n');
m_writeDataFile.write(content);
m_writeDataFile.write('\n');
m_writeDataFile.flush();
long offset = m_writeDataFileLength;
String line = id + '\t' + offset + '\n';
byte[] data = line.getBytes("utf-8");
// 报表索引存储
m_writeDataFileLength += num.length + 1 + length + 1;
m_writeIndexFile.write(data);
m_writeIndexFile.flush();
m_idToOffsets.put(id, offset);
return true;
} finally {
m_writeLock.unlock();
}
}
4.7、查询
4.7.1、报表数据
以 Transaction报表为例,源码如下:
@OutboundActionMeta(name = "t")
public void handleOutbound(Context ctx) {
Model model = new Model(ctx);
Payload payload = ctx.getPayload();
String domain = payload.getDomain();
String ipAddress = payload.getIpAddress();
String group = payload.getGroup();
// 查询小时报表
TransactionReport report = getHourlyReport(payload);
// 如果是查询ALL维度的报表,则会聚合所有机器的指标
report = m_mergeHelper.mergeAllMachines(report, ipAddress);
if (report != null) {
model.setReport(report);
buildTransactionMetaInfo(model, payload, report);
}
if (payload.isXml()) {
// 返回xml数据视图
m_xmlViewer.view(ctx, model);
} else {
// 返回jsp数据视图
m_jspViewer.view(ctx, model);
}
}
private TransactionReport getHourlyReport(Payload payload) {
String domain = payload.getDomain();
String ipAddress = payload.getIpAddress();
ModelRequest request = new ModelRequest(domain, payload.getDate()).setProperty("type", payload.getType())
.setProperty("ip", ipAddress);
if (m_service.isEligable(request)) {
ModelResponse<TransactionReport> response = m_service.invoke(request);
TransactionReport report = response.getModel();
return report;
} else {
throw new RuntimeException("Internal error: no eligable transaction service registered for " + request + "!");
}
}
public String buildReport(ModelRequest request, ModelPeriod period, String domain, ApiPayload payload)
throws Exception {
// 1、先从内存中查
List<TransactionReport> reports = super.getReport(period, domain);
TransactionReport report = null;
if (reports != null) {
report = new TransactionReport(domain);
TransactionReportMerger merger = new TransactionReportMerger(report);
for (TransactionReport tmp : reports) {
tmp.accept(merger);
}
}
// 2、若内存中没有,则从磁盘中查询报表数据
if ((report == null || report.getIps().isEmpty()) && period.isLast()) {
long startTime = request.getStartTime();
report = getReportFromLocalDisk(startTime, domain);
}
return filterReport(payload, report);
}
private TransactionReport getReportFromLocalDisk(long timestamp, String domain) throws Exception {
TransactionReport report = new TransactionReport(domain);
TransactionReportMerger merger = new TransactionReportMerger(report);
report.setStartTime(new Date(timestamp));
report.setEndTime(new Date(timestamp + TimeHelper.ONE_HOUR - 1));
for (int i = 0; i < getAnalyzerCount(); i++) {
ReportBucket bucket = null;
try {
// 目录:/data/appdatas/cat/bucket/report/年月日/时/0/report-transaction
bucket = m_bucketManager.getReportBucket(timestamp, TransactionAnalyzer.ID, i);
// 从磁盘中读取报表数据
String xml = bucket.findById(domain);
if (xml != null) {
TransactionReport tmp = DefaultSaxParser.parse(xml);
tmp.accept(merger);
}
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
}
return report;
}
从磁盘中读取报表数据:
public String findById(String id) throws IOException {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
m_readLock.lock();
try {
m_readDataFile.seek(offset);
int num = Integer.parseInt(m_readDataFile.readLine());
byte[] bytes = new byte[num];
m_readDataFile.readFully(bytes);
return new String(bytes, "utf-8");
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e);
} finally {
m_readLock.unlock();
}
}
return null;
}
返回的字符串如下: