Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

cat原理与源码分析 置顶!

1、示例

来自官网的一段简单demo:

public String quickstart() {
    // 开启了一个Transaction
    Transaction t = Cat.newTransaction("URL", "pageName");
    try {
        // Event事件
        Cat.logEvent("URL.Server", "serverIp", Event.SUCCESS, "ip=${serverIp}");
        // Metric指标
        Cat.logMetricForCount("metric.key");
        Cat.logMetricForDuration("metric.key", 5);
        t.setStatus(Transaction.SUCCESS);
    } catch (Exception e) {
        t.setStatus(e);
        Cat.logError(e);
    } finally {
        // 标记结束 complete() 
        t.complete();
    }
    return "hello";
}

2、报表

2.1、类型

常见的报表类型如下:

1、Transaction

监控一段代码运行情况。

运行次数、QPS、错误次数、失败率、响应时间统计(平均影响时间、Tp分位值)等等。

2、Event

监控一段代码运行次数。

例如记录程序中一个事件记录了多少次,错误了多少次。Event报表的整体结构与Transaction报表几乎一样,只缺少响应时间的统计。

3、Problem

记录整个项目在运行过程中出现的问题,包括一些异常、错误、访问较长的行为。Problem报表是由logview存在的特征整合而成,方便用户定位问题。 来源:

  • 业务代码显示调用Cat.logError(e) API进行埋点,具体埋点说明可查看埋点文档。
  • 与LOG框架集成,会捕获log日志中有异常堆栈的exception日志。
  • long-url,表示Transaction打点URL的慢请求
  • long-sql,表示Transaction打点SQL的慢请求
  • long-service,表示Transaction打点Service或者PigeonService的慢请求
  • long-call,表示Transaction打点Call或者PigeonCall的慢请求
  • long-cache,表示Transaction打点Cache.开头的慢请求
  • 业务代码显示调用Cat.logError(e) API进行埋点,具体埋点说明可查看埋点文档。
  • 与LOG框架集成,会捕获log日志中有异常堆栈的exception日志。
  • long-url,表示Transaction打点URL的慢请求
  • long-sql,表示Transaction打点SQL的慢请求
  • long-service,表示Transaction打点Service或者PigeonService的慢请求
  • long-call,表示Transaction打点Call或者PigeonCall的慢请求
  • long-cache,表示Transaction打点Cache.开头的慢请求

4、Heartbeat

表示程序内定期产生的统计信息, 如CPU利用率, 内存利用率, 连接池状态, 系统负载等

5、Metric

用于记录业务指标、指标可能包含对一个指标记录次数、记录平均值、记录总和,业务指标最低统计粒度为1分钟

2.2、图表

2.2.1、Transaction

image.png

点击show,展示以下内容:

image.png

点击URL,展示以下内容:

点击Log View,展示以下内容:

2.2.2、Event

2.2.3、Heartbeat

System Info

GC Info

JVMHeap Info

FrameworkThread Info

Disk Info

CatUsage Info

client-send-queue Info

2.2.4、Metric

2.2.5、State

3、客户端原理

3.1、架构图

来自官网的架构图如下:

Cat使用ThreadLocal保存消息上下文,在同一个线程里,产生的各个埋点会串联成一个消息树,树的每个节点都是一个消息(Message)。每个消息树都有一个唯一的messageId。当MessageTree标记complete时,通过MessageQueue异步的发送到服务端。

3.2、MessageId

每个日志,都会生成一个唯一的messageId,用户在查看日志时,通过messageId来查询到具体的日志。

格式

{domain}-{ip}-{hour}-{index} 示例:app_test-ac130001-463547-10030

MessageId分为四段:

  • 第1段:应用名
  • 第2段:当前机器的ip的16进制格式
  • 第3段:系统当前时间除以小时得到的整点数
  • 第4段:当前这个客户端在当前小时的顺序递增号

3.3、MessageTree

在同一个线程里,产生的各个埋点会串联成一个消息树,每个消息树都有一个唯一的messageId。

private ByteBuf m_buf;

// 服务名称
private String m_domain;

// 主机名
private String m_hostName;

// ip地址
private String m_ipAddress;

// transaction信息
private Message m_message;

// 消息id
private String m_messageId;

// 父消息id
private String m_parentMessageId;

// 根消息id
private String m_rootMessageId;

private String m_sessionToken;

// 线程组名称
private String m_threadGroupName;

// 线程id
private String m_threadId;

// 线程名称
private String m_threadName;

private boolean m_sample = true;

3.4、TcpSocketSender

客户端上报消息时,会先把消息放入队列中,然后通过sender线程从队列获取消息,异步发送给服务端。

3.4.1、初始化

public void initialize() {
    // 创建一个ChannelManager,管理跟服务端通信的Channel
    m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
    // 开启TcpSocketSender任务线程
    Threads.forGroup("cat").start(this);
    // 开启ChannelManager任务线程
    Threads.forGroup("cat").start(m_manager);
    // 开启合并原子消息任务线程
    Threads.forGroup("cat").start(new MergeAtomicTask());
}

3.4.2、创建ChannelManager

public ChannelManager(Logger logger, 
                      List<InetSocketAddress> serverAddresses, 
                      MessageQueue queue,
                      ClientConfigManager configManager, 
                      MessageIdFactory idFactory) {
    m_logger = logger;
    m_queue = queue;
    m_configManager = configManager;
    m_idfactory = idFactory;

    EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group).channel(NioSocketChannel.class);
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
        }
    });
    m_bootstrap = bootstrap;
  
    // @1
    String serverConfig = loadServerConfig();

    if (StringUtils.isNotEmpty(serverConfig)) {
        List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
  
        // @2
        ChannelHolder holder = initChannel(configedAddresses, serverConfig);

        if (holder != null) {
  
            // @3
            m_activeChannelHolder = holder;
        } else {
            m_activeChannelHolder = new ChannelHolder();
            m_activeChannelHolder.setServerAddresses(configedAddresses);
        }
    } else {
        ChannelHolder holder = initChannel(serverAddresses, null);

        if (holder != null) {
            m_activeChannelHolder = holder;
        } else {
            m_activeChannelHolder = new ChannelHolder();
            m_activeChannelHolder.setServerAddresses(serverAddresses);
            m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");
        }
    }
}

@1:从server拉取配置的路由列表。

@2:如果服务器端配置了路由列表,则用该路由列表初始化Channel。该方法从server返回的路由列表中选择第一个可用的ChannelHolder,然后赋值给m_activeChannelHolder。客户端上报数据时,会从ChannelManager中获取activeChannelHolder对象,然后上报数据。

@3:将获取到的有效的ChannelHolder赋值给m_activeChannelHolder,以后可以通过getActiveFuture方法获取ChannelFuture。

3.4.2、发送消息

public void run() {
    m_active = true;
    while (m_active) {
        // @1
        ChannelFuture channel = m_manager.channel();
        if (channel != null && checkWritable(channel)) {
            try {
                // @2
                MessageTree tree = m_queue.poll();
                if (tree != null) {
                    sendInternal(tree);
                    tree.setMessage(null);
                }
            } catch (Throwable t) {
                m_logger.error("Error when sending message over TCP socket!", t);
            }
        } 
        ...
    }
}


private void sendInternal(MessageTree tree) {
    // @1
    ChannelFuture future = m_manager.channel();
    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
    // @2
    m_codec.encode(tree, buf);
    int size = buf.readableBytes();
    Channel channel = future.channel();
    // @3
    channel.writeAndFlush(buf);
    if (m_statistics != null) {
        m_statistics.onBytes(size);
    }
}

@1:轮询检查channel的状态。

@2:当channel可写时,则从消息队列里面poll出一个消息树进行处理。

@3:从ChannelManager获取ChannelFuture对象。

@4:编码上报的消息。

@5:发送消息。

发送的数据,会经过格式化,对应如下:

PT1	app_test	172.19.0.1	172.19.0.1	main	30	http-nio-8088-exec-4	app_test-ac130001-463547-10030	null	null	nullt2022-11-18 19:36:57.628	URL	pageNameE2022-11-18  19:36:57.628	URL.Server	serverIp	0	ip=${serverIp}M2022-11-18 19:36:57.628		metric.key	C	1M2022-11-18  19:36:57.628		metric.key	T	5T2022-11-18  19:36:57.628	URL	pageName	0	29us

4、服务端原理

4.1、架构图

来自官网的架构图如下:

4.2、初始化

  • 完成Netty客户端的实例化,开启对2280端口的监听
  • 选择网络通信方式,根据不同的操作系统做不同的设置(linux下使用epoll更加高效)
  • 绑定消息编解码器
  • 设置网络通信配置参数
public void init() {
    try {
        // 默认2280端口启动
        startServer(m_port);
    } catch (Exception e) {
        m_logger.error(e.getMessage(), e);
    }
}

public synchronized void startServer(int port) throws InterruptedException {
    // 根据不同的操作系统,做不同的设置
    boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
    int threads = 24;
    ServerBootstrap bootstrap = new ServerBootstrap();
  
  // 设置netty的线程数量和channel通信方式
    m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
    // linux下使用epoll性能更好
    m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
    bootstrap.group(m_bossGroup, m_workerGroup);
    bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 添加消息解码器
            pipeline.addLast("decode", new MessageDecoder());
            pipeline.addLast("encode", new ClientMessageEncoder());
        }
    });
   // 网络通信配置
    bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
    bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    try {
        m_future = bootstrap.bind(port).sync();
        m_logger.info("start netty server!");
    } catch (Exception e) {
        m_logger.error("Started Netty Server Failed:" + port, e);
    }
}

4.3、解码

对接收到的消息作解码,解析出消息树,然后转给处理器消费

public class MessageDecoder extends ByteToMessageDecoder {
    private long m_processCount;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        if (buffer.readableBytes() < 4) {
            return;
        }
        buffer.markReaderIndex();
        // 读取消息的长度
        int length = buffer.readInt();
        buffer.resetReaderIndex();
        if (buffer.readableBytes() < length + 4) {
            return;
        }
        try {
            if (length > 0) {
                ByteBuf readBytes = buffer.readBytes(length + 4);

                readBytes.markReaderIndex();
                //readBytes.readInt();
    
          // 对消息进行解码,解析出MessageTree
                DefaultMessageTree tree = (DefaultMessageTree) CodecHandler.decode(readBytes);

                // readBytes.retain();
                readBytes.resetReaderIndex();
                tree.setBuffer(readBytes);
    
                // 消息处理器,对MessageTree进行消费
                m_handler.handle(tree);
    
                m_processCount++;

                long flag = m_processCount % CatConstants.SUCCESS_COUNT;

                if (flag == 0) {
                    m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
                }
            } else {
                // client message is error
                buffer.readBytes(length);
                BufReleaseHelper.release(buffer);
            }
        } catch (Exception e) {
            m_serverStateManager.addMessageTotalLoss(1);
            m_logger.error(e.getMessage(), e);
        }
    }
}
public void handle(MessageTree tree) {
    if (m_consumer == null) {
        m_consumer = lookup(MessageConsumer.class);
    }

    try {
        m_consumer.consume(tree);
    } catch (Throwable e) {
        m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
    }
}

4.4、分发

解码出消息树后,会根据消息树的时间戳寻找对应的Period,Period对应的是一个整点的时间范围(如1:00-2:00),然后分发给所有类型的报表处理器做处理。

每个报表处理器里有一个PeriodTask集合,对domain hash计算(Math.abs(domain.hashCode()) % length),将消息放入一个队列中,异步消费消息,以此避免阻塞,提高消息的处理效率。

public void consume(MessageTree tree) {
    long timestamp = tree.getMessage().getTimestamp();
    // 每小时对应一个Period
    Period period = m_periodManager.findPeriod(timestamp);

    if (period != null) {
        period.distribute(tree);
    } else {
        m_serverStateManager.addNetworkTimeError(1);
    }
}


public void initialize() throws InitializationException {
    // 创建Period管理器
    m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
    m_periodManager.init();

    Threads.forGroup("cat").start(m_periodManager);
}
public void distribute(MessageTree tree) {
    m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
    boolean success = true;
    String domain = tree.getDomain();

    // 遍历所有类型的任务处理器
    for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
        List<PeriodTask> tasks = entry.getValue();
        ......
        if (tasks.size()>1) {
            // 计算放入哪个队列
            index = Math.abs(domain.hashCode()) % length;
        }
        PeriodTask task = tasks.get(index);
  
        // 消息放入队列,异步的处理消息
        boolean enqueue = task.enqueue(tree);

        ......
    }
    ......
}

其中m_tasks为各种类型的任务处理器的集合,结构如下:

// 消息入队
public boolean enqueue(MessageTree tree) {
   ......
  //  m_queue类型为ArrayBlockingQueue,size默认30000
   boolean result = m_queue.offer(tree);
   ......
}

// 多线程异步的处理消息
public void run() {
    try {
        m_analyzer.analyze(m_queue);
    } catch (Exception e) {
        Cat.logError(e);
    }
}
public void analyze(MessageQueue queue) {
    while (!isTimeout() && isActive()) {
        // 从队列中获取消息
        MessageTree tree = queue.poll();

        if (tree != null) {
            try {
                // 对消息进行消费,这是个抽象方法,由实现类完成具体的逻辑
                process(tree);
            } catch (Throwable e) {
                m_errors++;

                if (m_errors == 1 || m_errors % 10000 == 0) {
                    Cat.logError(e);
                }
            }
        }
    }
    // 如果当前解析器超时,那么处理完对应队列内的消息后返回
    while (true) {
        MessageTree tree = queue.poll();

        if (tree != null) {
            try {
                process(tree);
            } catch (Throwable e) {
                m_errors++;

                if (m_errors == 1 || m_errors % 10000 == 0) {
                    Cat.logError(e);
                }
            }
        } else {
            break;
        }
    }
}


protected abstract void process(MessageTree tree);

4.5、分析

不同的分析器将从不同角度去分析统计上报的消息,汇总之后生成不同的报表。

cat内置的分析器有以下几种:

分析器分析上报的消息后,生成相应的报表存于Report对象中,报表由报表管理器(ReportManage)管理,报表在报表管理器中结构如下:

// 当前时间周期 --> domain --> Report对象
Map<Long, Map<String, T>> m_reports = new ConcurrentHashMap<Long, Map<String, T>>();

以TransactionAnalyzer为例,TransactionAnalyzer会统计消息的运行次数、QPS、错误次数、失败率、响应时间统计(平均影响时间、Tp分位值)等。统计结果存于TransactionReport中。

public void process(MessageTree tree) {
    String domain = tree.getDomain();
    // 根据时间周期、domain从报表管理器中获取报表
    TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
    List<Transaction> transactions = tree.findOrCreateTransactions();

    for (Transaction t : transactions) {
        String data = String.valueOf(t.getData());

        // 对Transaction做处理
        if (data.length() > 0 && data.charAt(0) == CatConstants.BATCH_FLAG) {
            processBatchTransaction(tree, report, t, data);
        } else {
            processTransaction(report, tree, t);
        }
    }

    if (System.currentTimeMillis() > m_nextClearTime) {
        m_nextClearTime = m_nextClearTime + TimeHelper.ONE_MINUTE;

        Threads.forGroup("cat").start(new Runnable() {

            @Override
            public void run() {
                cleanUpReports();
            }
        });
    }
}

private void processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
    String type = t.getType();
    String name = t.getName();

    if (!m_filterConfigManager.discardTransaction(type, name)) {
        boolean valid = checkForTruncatedMessage(tree, t);

        if (valid) {
            // 解析出ip
            String ip = tree.getIpAddress();
            TransactionType transactionType = findOrCreateType(report.findOrCreateMachine(ip), type);
            TransactionName transactionName = findOrCreateName(transactionType, name, report.getDomain());

            processTypeAndName(t, transactionType, transactionName, tree, t.getDurationInMillis());
        }
    }
}


private void processTypeAndName(Transaction t, TransactionType type, TransactionName name, MessageTree tree,double duration) {
    String messageId = tree.getMessageId();

    type.incTotalCount();
    name.incTotalCount();

    type.setSuccessMessageUrl(messageId);
    name.setSuccessMessageUrl(messageId);

    if (!t.isSuccess()) {
        type.incFailCount();
        name.incFailCount();

        String statusCode = formatStatus(t.getStatus());

        findOrCreateStatusCode(name, statusCode).incCount();
    }

    int allDuration = DurationComputer.computeDuration((int) duration);
    double sum = duration * duration;

    if (type.getMax() <= duration) {
        type.setLongestMessageUrl(messageId);
    }
    if (name.getMax() <= duration) {
        name.setLongestMessageUrl(messageId);
    }
    name.setMax(Math.max(name.getMax(), duration));
    name.setMin(Math.min(name.getMin(), duration));
    name.setSum(name.getSum() + duration);
    name.setSum2(name.getSum2() + sum);
    name.findOrCreateAllDuration(allDuration).incCount();

    type.setMax(Math.max(type.getMax(), duration));
    type.setMin(Math.min(type.getMin(), duration));
    type.setSum(type.getSum() + duration);
    type.setSum2(type.getSum2() + sum);
    type.findOrCreateAllDuration(allDuration).incCount();

    long current = t.getTimestamp() / 1000 / 60;
    int min = (int) (current % (60));
    boolean statistic = m_statisticManager.shouldStatistic(type.getId(), tree.getDomain());

    processNameGraph(t, name, min, duration, statistic, allDuration);
    processTypeRange(t, type, min, duration, statistic, allDuration);
}


private void processNameGraph(Transaction t, TransactionName name, int min, double d, boolean statistic,
                        int allDuration) {
    int dk = formatDurationDistribute(d);

    Duration duration = name.findOrCreateDuration(dk);
    Range range = name.findOrCreateRange(min);

    duration.incCount();
    range.incCount();

    if (!t.isSuccess()) {
        range.incFails();
    }

    range.setSum(range.getSum() + d);
    range.setMax(Math.max(range.getMax(), d));
    range.setMin(Math.min(range.getMin(), d));

    if (statistic) {
        range.findOrCreateAllDuration(allDuration).incCount();
    }
}

private void processTypeRange(Transaction t, TransactionType type, int min, double d, boolean statistic,
                        int allDuration) {
    Range2 range = type.findOrCreateRange2(min);

    if (!t.isSuccess()) {
        range.incFails();
    }

    range.incCount();
    range.setSum(range.getSum() + d);
    range.setMax(Math.max(range.getMax(), d));
    range.setMin(Math.min(range.getMin(), d));

    if (statistic) {
        range.findOrCreateAllDuration(allDuration).incCount();
    }
}

当消息树数据如下时,观察各个阶段报表的变化:

PT1	cat	devuser-pc	192.168.250.1	main	29	localhost-startStop-1	cat-c0a8fa01-463547-3538	null	nullt2022-11-18 19:36:37.025	SQL	config.findByNameE2022-11-18 19:36:37.391	SQL.Method	SELECT	0	["user-config"]E2022-11-18 19:36:37.391	SQL.Database	jdbc:mysql://127.0.0.1:3306/cat?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&socketTimeout=120000	0T2022-11-18 19:36:38.034	SQL	config.findByName	0	326665us	SELECT c.id,c.name,c.content,c.creation_date,c.modify_date FROM config c WHERE c.name = ?

1、process方法刚开始时,TransactionReport数据如下:

<?xml version="1.0" encoding="utf-8"?>
<transaction-report domain="cat" startTime="2022-11-18 19:00:00" endTime="2022-11-18 19:59:59">
</transaction-report>

2、processNameGraph方法后,TransactionName数据如下:

<?xml version="1.0" encoding="utf-8"?>
<name id="config.findByName" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
   <successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
   <longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
   <range value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
   <duration value="512" count="1"/>
</name>

3、processTypeRange方法后,TransactionType数据如下:

<?xml version="1.0" encoding="utf-8"?>
<type id="SQL" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
   <successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
   <longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
   <name id="config.findByName" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
      <successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
      <longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
      <range value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
      <duration value="512" count="1"/>
   </name>
   <range2 value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
</type>

4、process方法结束后,TransactionReport数据如下:

<?xml version="1.0" encoding="utf-8"?>
<transaction-report domain="cat" startTime="2022-11-18 19:00:00" endTime="2022-11-18 19:59:59">
   <machine ip="192.168.250.1">
      <type id="SQL" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
         <successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
         <longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
         <name id="config.findByName" totalCount="1" failCount="0" failPercent="0.00" min="284.00" max="284.00" avg="0.0" sum="284.0" sum2="80656.0" std="0.0" tps="0.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00">
            <successMessageUrl>cat-c0a8fa01-463547-3538</successMessageUrl>
            <longestMessageUrl>cat-c0a8fa01-463547-3538</longestMessageUrl>
            <range value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
            <duration value="512" count="1"/>
         </name>
         <range2 value="28" count="1" sum="284.0" avg="0.0" fails="0" min="284.00" max="284.00" line95Value="0.00" line99Value="0.00" line999Value="0.00" line90Value="0.00" line50Value="0.00" line9999Value="0.00"/>
      </type>
   </machine>
</transaction-report>

4.6、存储

主要分成两大类:

  • 原始数据的存储(MessageTree)
  • 各种报表的存储(Transaction、Event等)

4.6.1、原始数据

原始数据的存储由DumpAnalyzer完成。保存到 [/data/appdatas/cat/bucket/dump/年月日/时] 目录下。日志进行磁盘存储时,每个应用会对应生成两种文件:数据文件(.dat)、索引文件(.idx)。

文件如下:

结构如下:

4.6.1.1、DumpAnalyzer

DumpAnalyzer对消息树解析,解析出messageId,通过messageId,可以获取这条日志的domain、ip等信息。然后交由MessageDumper做进一步处理:

public void process(MessageTree tree) {
    try {
        MessageId messageId = MessageId.parse(tree.getMessageId());

        if (!shouldDiscard(messageId)) {
            processWithStorage(tree, messageId, messageId.getHour());
        }
    } catch (Exception ignored) {
    }
}

private void processWithStorage(MessageTree tree, MessageId messageId, int hour) {
    MessageDumper dumper = m_dumperManager.find(hour);

    tree.setFormatMessageId(messageId);

    if (dumper != null) {
        dumper.process(tree);
    } else {
        m_serverStateManager.addPigeonTimeError(1);
    }
}

4.6.1.2、MessageDumper

MessageDumper的处理逻辑在process方法中,对ip hash取模计算,将消息放到对应的队列中。每个队列对应一个MessageProcessor(默认20个),从队列中取出消息异步消费。

private BlockDumperManager m_blockDumperManager;

private BucketManager m_bucketManager;

private ServerStatisticManager m_statisticManager;

private ServerConfigManager m_configManager;

// 缓存队列(默认20个)
private List<BlockingQueue<MessageTree>> m_queues = new ArrayList<BlockingQueue<MessageTree>>();

// 处理器列表(默认20个)
private List<MessageProcessor> m_processors = new ArrayList<MessageProcessor>();

private AtomicInteger m_failCount = new AtomicInteger(-1);

private Logger m_logger;

private long m_total;

// 默认20个线程
private int m_processThreads;

public void initialize(int hour) {
     // 默认20个线程
    int processThreads = m_configManager.getMessageProcessorThreads();
    m_processThreads = processThreads;

    for (int i = 0; i < processThreads; i++) {
        BlockingQueue<MessageTree> queue = new ArrayBlockingQueue<MessageTree>(10000);
        MessageProcessor processor = lookup(MessageProcessor.class);

        m_queues.add(queue);
        m_processors.add(processor);

        processor.initialize(hour, i, queue);
        Threads.forGroup("Cat").start(processor);
    }
}

public void process(MessageTree tree) {
    // 解析出MessageId,从id中可以获取这条日志的domain、ip等信息
    MessageId id = tree.getFormatMessageId();
    String domain = id.getDomain();

     // 对ip的hash值取模计算,来决定消息放入哪个队列
     // (Math.abs(id.getIpAddressInHex().hashCode())) % (m_processThreads)
    int index = getIndex(id.getIpAddressInHex());
    // queue类型为ArrayBlockingQueue,size为10000
    BlockingQueue<MessageTree> queue = m_queues.get(index);

    // 消息入队
    boolean success = queue.offer(tree);
    ......
}


private int getIndex(String key) {
    // m_processThreads默认为20
    return (Math.abs(key.hashCode())) % (m_processThreads);
}


private void closeMessageProcessor() throws InterruptedException {
    ......
    for (MessageProcessor processor : m_processors) {
        processor.shutdown();
        super.release(processor);
    }
}

4.6.1.3、MessageProcessor

DefaultMessageProcessor对每条消息进行处理,首先从Map<domain,Block>找到domain对应的Block,然后将该息,通过snappy算法进行压缩,追加到Block中的内存块中,并且将该息的MessageId和在Block中的偏移量放入Map<MessageId,offset>中。这样,通过MessageId,和对应的Block就可以查到该日志。(日志在追加到Block时,会在最开始4个字节写入日志的长度,这样通过日志的偏移量就可以定位出日志了)。将日志追加到Block中的内存块中,当Block写满256K时,会生成一个新的Block,原先的Block会放到DefaultBlcokDumper进行写入磁盘中。

DefaultMessageProcessor代码如下:

private BlockDumperManager m_blockDumperManager;

private MessageFinderManager m_finderManager;

private ServerConfigManager m_configManger;

private BlockDumper m_dumper;

// 当前MessageProcessor的编号
private int m_index;

// new ArrayBlockingQueue<MessageTree>(10000)
private BlockingQueue<MessageTree> m_queue;

// key为应用名称,Block为一段内存块,用于存放日志
private ConcurrentHashMap<String, Block> m_blocks = new ConcurrentHashMap<String, Block>();

private int m_hour;

private AtomicBoolean m_enabled;

private CountDownLatch m_latch;

private int m_count;


public void run() {
    MessageTree tree;
    try {
        while (m_enabled.get() || !m_queue.isEmpty()) {
            tree = pollMessage();

            if (tree != null) {
                processMessage(tree);
            }
        }
    } catch (InterruptedException e) {
    }



    for (Block block : m_blocks.values()) {
        try {
            block.finish();

            m_dumper.dump(block);
        } catch (IOException e) {
            // ignore it
        }
    }

    m_blocks.clear();
    m_latch.countDown();
}


private void processMessage(MessageTree tree) {
    MessageId id = tree.getFormatMessageId();
    String domain = id.getDomain();
    int hour = id.getHour();

    // 获取应用对应的block
    Block block = m_blocks.get(domain);

    // block不存在,则创建一个block
    if (block == null) {
        block = new DefaultBlock(domain, hour);
        m_blocks.put(domain, block);
    }

    ByteBuf buffer = tree.getBuffer();

    try {
        // 当Block写满256K时
        if (block.isFull()) {
            block.finish();
            // Block放到DefaultBlcokDumper中写入磁盘
            m_dumper.dump(block);
            // 生成一个新的Block
            block = new DefaultBlock(domain, hour);
            m_blocks.put(domain, block);
        }
  
        // 若Block未满,则将日志追加到Block中的内存块中
        block.pack(id, buffer);
    } catch (Exception e) {
        Cat.logError(e);
    } finally {
        ReferenceCountUtil.release(buffer);
    }
}

DefaultBlock代码如下:

// block大小为256K
private static final int MAX_SIZE = 256 * 1024;

private String m_domain;

private int m_hour;

// block中存储的数据
private ByteBuf m_data;

private int m_offset;

// 用于记录每个日志在block中的偏移量
private Map<MessageId, Integer> m_offsets = new LinkedHashMap<MessageId, Integer>();

// SnappyOutputStream类型
private volatile OutputStream m_out;

private volatile boolean m_isFlush;

public void pack(MessageId id, ByteBuf buf) throws IOException {
    synchronized (m_out) {
        int len = buf.readableBytes();

        buf.readBytes(m_out, len);
        // 用于记录每个日志放到Data中的偏移量
        m_offsets.put(id, m_offset);
        m_offset += len;
    }
}

SnappyOutputStream 是 Google 的 Snappy 压缩库的一部分,它提供了非常高效的压缩算法。通过使用 SnappyOutputStream,您可以将数据写入输出流,并在写入的同时进行压缩,从而减小数据的体积,节省存储空间和网络带宽。这对于处理大量数据或需要在网络上传输大量数据的场景非常有用。

4.6.1.4、BlockDumper

BlockDumper通过 domain进行hash,将Block放入到DefaultBlockWriter进行磁盘写入。

private ServerStatisticManager m_statisticManager;

private ServerConfigManager m_configManager;

private List<BlockingQueue<Block>> m_queues = new ArrayList<BlockingQueue<Block>>();

private List<BlockWriter> m_writers = new ArrayList<BlockWriter>();

private int m_failCount = -1;

private Logger m_logger;


public void initialize(int hour) {
    // 默认线程数为5
    int threads = m_configManager.getMessageDumpThreads();

    for (int i = 0; i < threads; i++) {
        BlockingQueue<Block> queue = new ArrayBlockingQueue<Block>(10000);
        BlockWriter writer = lookup(BlockWriter.class);

        m_queues.add(queue);
        m_writers.add(writer);

        writer.initialize(hour, i, queue);
        Threads.forGroup("Cat").start(writer);
    }
}

public void dump(Block block) throws IOException {
    String domain = block.getDomain();
    int hash = Math.abs(domain.hashCode());
    int index = hash % m_writers.size();
    BlockingQueue<Block> queue = m_queues.get(index);
    boolean success = queue.offer(block);

    if (!success) {
        m_statisticManager.addBlockLoss(1);

        if ((++m_failCount % 100) == 0) {
            Cat.logError(new BlockQueueFullException("Error when adding block to queue, fails: " + m_failCount));
            m_logger.info("block dump queue is full " + m_failCount + " index:" + index);
        }
    } else {
        m_statisticManager.addBlockTotal(1);
    }
}

4.6.1.5、BlockWriter

BlockWriter用于将每个Block写入磁盘,他会从LocalBucketManager获取到对应的LocalBucket,通过LocalBucket,将Block数据写入磁盘中。

private BucketManager m_bucketManager;

private ServerStatisticManager m_statisticManager;

private int m_index;

private BlockingQueue<Block> m_queue;

private long m_hour;

private int m_count;

private AtomicBoolean m_enabled;

private CountDownLatch m_latch;

public void initialize(int hour, int index, BlockingQueue<Block> queue) {
    m_hour = hour;
    m_index = index;
    m_queue = queue;
    m_enabled = new AtomicBoolean(true);
    m_latch = new CountDownLatch(1);
}

public void run() {
    String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();

    try {
        while (m_enabled.get() || !m_queue.isEmpty()) {
            Block block = m_queue.poll(5, TimeUnit.MILLISECONDS);

            if (block != null) {
                long time = System.currentTimeMillis();
                processBlock(ip, block);
                long duration = System.currentTimeMillis() - time;

                m_statisticManager.addBlockTime(duration);
            }
        }
    } catch (InterruptedException e) {
        // ignore it
    }

    m_latch.countDown();
}

private void processBlock(String ip, Block block) {
    try {
        Bucket bucket = m_bucketManager.getBucket(block.getDomain(), ip, block.getHour(), true);
        boolean monitor = (++m_count) % 1000 == 0;

        if (monitor) {
            Transaction t = Cat.newTransaction("Block", block.getDomain());

            try {
                bucket.puts(block.getData(), block.getOffsets());
            } catch (Exception e) {
                Cat.logError(ip, e);
                t.setStatus(Transaction.SUCCESS);
            }

            t.setStatus(Transaction.SUCCESS);
            t.complete();
        } else {
            try {
                bucket.puts(block.getData(), block.getOffsets());
            } catch (Exception e) {
                Cat.logError(ip, e);
            }
        }
    } catch (Exception e) {
        Cat.logError(ip, e);
    } catch (Error e) {
        Cat.logError(ip, e);
    } finally {
        block.clear();
    }
}

4.6.1.6、LocalBucket

每个应用每小时都对应一个LocakBucket,日志进行磁盘存储时,每个应用每小时会生成两个文件,一个数据文件(.dat)、一个索引文件(.idx)。

public synchronized void puts(ByteBuf data, Map<MessageId, Integer> mappings) throws IOException {
    long dataOffset = m_data.getDataOffset();
    // 数据写入数据文件(.dat)中
    m_data.write(data);

    for (Map.Entry<MessageId, Integer> e : mappings.entrySet()) {
        MessageId id = e.getKey();
        int offset = e.getValue();
        // 索引写入索引文件(.idx)中
        m_index.write(id, dataOffset, offset);
    }
}
4.6.1.6.1、DataHelper

用于数据文件(.dat)的读写,数据文件的存储比较简单,就是把Block中的Data数据append到文件中。

private void write(ByteBuf data) throws IOException {
    int len = data.readableBytes();

    m_out.writeInt(len);
    data.readBytes(m_out, len);
    m_offset += len + 4;
}
4.6.1.6.2、IndexHelper

IndexHelper用于索引文件(.idx)的读写,会记录每个MessageId对应的日志在数据文件的偏移量,这样通过索引文件,就可以查到对应的日志。

结构如下:

一级索引:

  • 由 4096 个 8byte 构成
  • 每个索引项由64位存储,前 32 位为 IP,后 32 位为 index/4k
  • 第一个 8byte 存储可存储魔数(图中用 -1 表示),用于标识文件有效性
  • 一级索引剩余 4095 个 8byte 分别与二级索引中每个 segment 顺序一一对应

二级索引:

  • 由 4095 个 segment 构成,每个 segment 由 4096 个 8byte 构成
  • 每个索引项由64位存储,前 40 位为存储块的首地址,后 24 位为解压后的块内偏移地址
private void write(MessageId id, long blockAddress, int blockOffset) throws IOException {
    long position = m_header.getOffset(id.getIpAddressValue(), id.getIndex(), true);
    long address = position / SEGMENT_SIZE;
    int offset = (int) (position % SEGMENT_SIZE);
  
    // 获取segment
    Segment segment = getSegment(id.getIpAddressInHex(), address);
  
    long value = (blockAddress << 24) + blockOffset;

    if (segment != null) {
        // 二级索引,每个索引项前40位存储块的首地址,后24位为解压后的块内偏移地址
        segment.writeLong(offset, value);
    } else {
        if (m_count.incrementAndGet() % 1000 == 0) {
            Cat.logEvent("AbnormalBlock", id.getDomain());
        }
        if (m_nioEnabled) {
            m_indexChannel.position(position);

            ByteBuffer buf = ByteBuffer.allocate(8);
            buf.putLong(value);
            buf.flip();
            m_indexChannel.write(buf);
        } else {
            m_file.seek(position);
            m_file.writeLong(value);
        }
    }
}

4.6.2、报表数据

每种类型的报表会生成两个文件:数据文件、索引文件。

目录

保存在 [/data/appdatas/cat/bucket/report/年月日/时/index] 目录下:

数据文件

存储该分析器下所有的报表数据。如report-transaction文件内容如下:

索引文件

存储数据文件内报表的位置索引。比如 report-transaction.idx文件内容如下:

源码

以TransactionAnalyzer为例,TransactionAnalyzer的结束是在doCheckpoint中完成的

public synchronized void doCheckpoint(boolean atEnd) {
    if (atEnd && !isLocalMode()) {
        m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
    } else {
        m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
    }
}
public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
    ......
    // 取出报表
    Map<String, T> reports = m_reports.get(startTime);
    ReportBucket bucket = m_bucketManager.getReportBucket(startTime, m_name, index);
    // 保存前的预处理
    m_reportDelegate.beforeSave(reports);
    try {
        // 保存
        storeFile(reports, bucket);
    } finally {
        m_bucketManager.closeBucket(bucket);
    }
    ......
}

 
private void storeFile(Map<String, T> reports, ReportBucket bucket) {
    for (T report : reports.values()) {
        try {
            // 获取domain
            String domain = m_reportDelegate.getDomain(report);
            // 报表内容转化为xml数据
            String xml = m_reportDelegate.buildXml(report);
            // 报表存储
            bucket.storeById(domain, xml);
        } catch (Exception e) {
            Cat.logError(e);
        }
    }
}
public ReportBucket getReportBucket(long timestamp, String name, int index) throws IOException {
    Date date = new Date(timestamp);
    ReportBucket bucket = lookup(ReportBucket.class);

    bucket.initialize(name, date, index);
    return bucket;
}
public void initialize(String name, Date timestamp, int index) throws IOException {
    // 报表存储目录
    m_baseDir = Cat.getCatHome() + "bucket/report";
    m_writeLock = new ReentrantLock();
    m_readLock = new ReentrantLock();

    // 年月日/时/index/report-transaction(20220825/19/0/report-transaction)
    String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index);

    // 数据文件
    File dataFile = new File(m_baseDir, logicalPath);
    // 索引文件
    File indexFile = new File(m_baseDir, logicalPath + ".idx");

    if (indexFile.exists()) {
        loadIndexes(indexFile);
    }

    final File dir = dataFile.getParentFile();

    if (!dir.exists() && !dir.mkdirs()) {
        throw new IOException(String.format("Fail to create directory(%s)!", dir));
    }

    m_logicalPath = logicalPath;
    m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
    m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
    m_writeDataFileLength = dataFile.length();
    m_readDataFile = new RandomAccessFile(dataFile, "r");
}
public boolean storeById(String id, String report) throws IOException {
    // 报表内容
    byte[] content = report.getBytes("utf-8");
    // 报表长度
    int length = content.length;
    byte[] num = String.valueOf(length).getBytes("utf-8");

    m_writeLock.lock();

    try {
        // 报表数据存储
        m_writeDataFile.write(num);
        m_writeDataFile.write('\n');
        m_writeDataFile.write(content);
        m_writeDataFile.write('\n');
        m_writeDataFile.flush();

        long offset = m_writeDataFileLength;
        String line = id + '\t' + offset + '\n';
        byte[] data = line.getBytes("utf-8");

        // 报表索引存储
        m_writeDataFileLength += num.length + 1 + length + 1;
        m_writeIndexFile.write(data);
        m_writeIndexFile.flush();
        m_idToOffsets.put(id, offset);
        return true;
    } finally {
        m_writeLock.unlock();
    }
}

4.7、查询

4.7.1、报表数据

以 Transaction报表为例,源码如下:

@OutboundActionMeta(name = "t")
public void handleOutbound(Context ctx) {

    Model model = new Model(ctx);
    Payload payload = ctx.getPayload();

    String domain = payload.getDomain();
    String ipAddress = payload.getIpAddress();
    String group = payload.getGroup();

    // 查询小时报表
    TransactionReport report = getHourlyReport(payload);

    // 如果是查询ALL维度的报表,则会聚合所有机器的指标
    report = m_mergeHelper.mergeAllMachines(report, ipAddress);

    if (report != null) {
        model.setReport(report);
        buildTransactionMetaInfo(model, payload, report);
    }
  
    if (payload.isXml()) {
         // 返回xml数据视图
        m_xmlViewer.view(ctx, model);
    } else {
         // 返回jsp数据视图
        m_jspViewer.view(ctx, model);
    }
}

private TransactionReport getHourlyReport(Payload payload) {
    String domain = payload.getDomain();
    String ipAddress = payload.getIpAddress();
    ModelRequest request = new ModelRequest(domain, payload.getDate()).setProperty("type", payload.getType())
                            .setProperty("ip", ipAddress);

    if (m_service.isEligable(request)) {
        ModelResponse<TransactionReport> response = m_service.invoke(request);
        TransactionReport report = response.getModel();

        return report;
    } else {
        throw new RuntimeException("Internal error: no eligable transaction service registered for " + request + "!");
    }
}
public String buildReport(ModelRequest request, ModelPeriod period, String domain, ApiPayload payload)
                        throws Exception {
     // 1、先从内存中查
    List<TransactionReport> reports = super.getReport(period, domain);
    TransactionReport report = null;

    if (reports != null) {
        report = new TransactionReport(domain);
        TransactionReportMerger merger = new TransactionReportMerger(report);

        for (TransactionReport tmp : reports) {
            tmp.accept(merger);
        }
    }

    // 2、若内存中没有,则从磁盘中查询报表数据
    if ((report == null || report.getIps().isEmpty()) && period.isLast()) {
        long startTime = request.getStartTime();
        report = getReportFromLocalDisk(startTime, domain);
    }
  
    return filterReport(payload, report);
}

private TransactionReport getReportFromLocalDisk(long timestamp, String domain) throws Exception {
    TransactionReport report = new TransactionReport(domain);
    TransactionReportMerger merger = new TransactionReportMerger(report);

    report.setStartTime(new Date(timestamp));
    report.setEndTime(new Date(timestamp + TimeHelper.ONE_HOUR - 1));

    for (int i = 0; i < getAnalyzerCount(); i++) {
        ReportBucket bucket = null;
        try {
            // 目录:/data/appdatas/cat/bucket/report/年月日/时/0/report-transaction
            bucket = m_bucketManager.getReportBucket(timestamp, TransactionAnalyzer.ID, i);
            // 从磁盘中读取报表数据
            String xml = bucket.findById(domain);

            if (xml != null) {
                TransactionReport tmp = DefaultSaxParser.parse(xml);

                tmp.accept(merger);
            }
        } finally {
            if (bucket != null) {
                m_bucketManager.closeBucket(bucket);
            }
        }
    }
    return report;
}

从磁盘中读取报表数据:

public String findById(String id) throws IOException {
    Long offset = m_idToOffsets.get(id);

    if (offset != null) {
        m_readLock.lock();

        try {
            m_readDataFile.seek(offset);

            int num = Integer.parseInt(m_readDataFile.readLine());
            byte[] bytes = new byte[num];

            m_readDataFile.readFully(bytes);

            return new String(bytes, "utf-8");
        } catch (Exception e) {
            m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e);
        } finally {
            m_readLock.unlock();
        }
    }

    return null;
}

返回的字符串如下: