基于perf的内存访问采样
进程性能的剖析,包括了内存访问模式的分析。这通常都需要对进程访问内存做采样,从而得到进程访问的虚拟地址、发起访问的CPU等信息。有了这些信息,就可以做诸如内存热点分析、优化数据结构等工作。 在Intel处理器中,有丰富的Performance Monitor相关的硬件资源。Linux Perf子系统对Intel的这些硬件功能进行了封装。Perf有两种工作模式,一种是counting,一种是sampling。counting模式就是常用的模式,它只计数,比如统计cache miss次数。而sampling模式则更加高级,当某个事件的计数达到某个预定值时,CPU就会把当前状态的快照保存下来。因此,在sampling模式下,可以得到CPU在各个时间点的运行状态的采样。状态包括那个瞬间的CPU core、各个寄存器的值、函数调用桟、正在访问的虚拟内存地址、正在执行的进程ID等等信息。Intel把这种采用功能叫做PEBS(Processor Event Based Sampling)。 并不是所有的硬件事件都支持PEBS。不同的CPU支持的事件可以在《#HREF"https://software.intel.com/en-us/articles/intel-sdm"#-HREF1Intel® 64 and IA-32 Architectures Software Developer Manuals#-HREF2》,比如 1.png 最常用也最通用的事件是ALL_LOADS和ALL_STORES,代码分别是D081河D082。 Perf子系统的使用可以参考perf_event_open()的手册,即 +++code man perf_event_open ---code 我在《#HREF"../../2017/Linux perf子系统的使用(三)——采样(poll方式)/index.html"#-HREF1Linux perf子系统的使用(三)——采样(poll方式)#-HREF2》一文中给出了Perf sampling的使用。但是当时的实现存在如下两个弊端: #OL #LI使用mmap()创建环形队列时,使用read-only映射,无法与kernel同步,因此如果用户态读取不及时,kernel会覆盖未读的数据,从而导致数据丢失以及后续数据错位。#-LI #LI只能对单个事件做poll(),没法对一个集合做poll。 #-OL 当时更多地是为了演示perf sampling接口的用法。而这篇博客则是为了给出一个通用的、几乎是产品级的实现。 我的代码都需要依赖common.h,它提供了错误处理以及各种小工具的宏定义: +++code #ifndef COMMON_H #define COMMON_H #include <errno.h> #include <stdio.h> #include <assert.h> #include <stdint.h> #include <stdlib.h> #include <string.h> #define ERROR(cleanup, ret, show_errstr, msgs...) \ ({ \ const char* _errstr = (show_errstr) ? strerror(errno) : ""; \ (cleanup); \ fprintf(stderr, "[<%s> @ %s: %d]: ", __FUNCTION__, __FILE__, __LINE__); \ fprintf(stderr, ##msgs); \ fprintf(stderr, "%s\n", _errstr); \ return (ret); \ }) #ifndef likely #define likely(x) __builtin_expect(!!(x), 1) #endif #ifndef unlikely #define unlikely(x) __builtin_expect(!!(x), 0) #endif #ifndef MIN2 #define MIN2(a, b) \ ({ \ typeof(a) x = (a); \ typeof(b) y = (b); \ x < y ? x : y; \ }) #endif #ifndef MAX2 #define MAX2(a, b) \ ({ \ typeof(a) x = (a); \ typeof(b) y = (b); \ x > y ? x : y; \ }) #endif #endif ---code =======================阶段一:信道Channel======================== 首先,我们封装一个“信道”的概念。一个信道只能对一个进程的一个事件做采样。 channel.h +++code #ifndef CHANNEL_H #define CHANNEL_H #include "common.h" #include <unistd.h> class Channel { public: enum Type { CHANNEL_LOAD = 0x81D0, // sample load instructions CHANNEL_STORE = 0x82D0, // sample store instructions }; struct Sample { Type type; uint32_t cpu; // on which cpu(core) this sample happens uint32_t pid; // in which process(pid) and thread(tid) this sample happens uint32_t tid; uint64_t address; // the virtual address in this process to be accessed }; Channel(); ~Channel(); /* Initialize the Channel. * pid: the process to be sampled * type: type of instructions to be sampled * RETURN: 0 if OK, or a negative error code * NOTE: after calling bind(), the Channel remains disabled until setPeriod() is called. */ int bind(pid_t pid, Type type); /* De-initialize the Channel. * NOTE: after calling unbind(), the Channel go back to uninitialized. */ void unbind(); /* Set the sample period. * Sample period means that a sample is triggered every how many instructions. * For example, if period is set to be 10000, then a sample happens every 10000 instructions. * period: the period * RETURN: 0 if OK, or a negative error code * NOTE: a zero period disables this Channel. And there is a minimal threshold on it, * <period> is invalid if less than the threshold. The threshold varies between * different hardwares. */ int setPeriod(unsigned long period); /* Read a sample from this Channel. * sample: the buffer to receive the sample * RETURN: 0 if OK, -EAGAIN if not available, or a negative error code */ int readSample(Sample* sample); /* Get the pid of target process. * RETURN: pid, or a meaningless value if uninitialized. */ pid_t getPid(); /* Get the type to sample. * RETURN: type, or a meaningless value if uninitialized. */ Type getType(); /* Get the file descriptor from perf_event_open(). * RETURN: the file descriptor, or -1 if uninitialized. * NOTE: Be careful with the fd, a wrong use of it will disturb the logic of this Channel. */ int getPerfFd(); private: pid_t m_pid; // pid of target process Type m_type; // type int m_fd; // file descriptor from perf_event_open() uint64_t m_id; // sample id of each record void* m_buffer; // ring buffer and its header unsigned long m_period; // sample_period }; #endif ---code 可以看到,Channel::Type的本质就是PEBS的事件代码。由于Intel是小端架构,因此Manual上的事件代码的字节序需要反过来写。如果将来需要扩展别的事件,只需要根据Manual增加新的Type即可。 channel.cpp: +++code #include "channel.h" #include <unistd.h> #include <sys/mman.h> #include <sys/ioctl.h> #include <sys/syscall.h> #include <linux/perf_event.h> #define WAKEUP_EVENTS 1 #define INIT_SAMPLE_PERIOD 100000 #define PAGE_SIZE 4096 #define RING_BUFFER_PAGES 4 #define MMAP_SIZE ((1 + RING_BUFFER_PAGES) * PAGE_SIZE) #define READ_MEMORY_BARRIER() __builtin_ia32_lfence() // wrapper of perf_event_open() syscall static int perf_event_open(struct perf_event_attr *attr, pid_t pid, int cpu, int group_fd, unsigned long flags) { return syscall(__NR_perf_event_open, attr, pid, cpu, group_fd, flags); } Channel::Channel() { m_fd = -1; } Channel::~Channel() { unbind(); } int Channel::bind(pid_t pid, Type type) { if(m_fd >= 0) ERROR({}, -EINVAL, false, "this Channel has already bound"); struct perf_event_attr attr; memset(&attr, 0, sizeof(struct perf_event_attr)); attr.type = PERF_TYPE_RAW; attr.config = (uint64_t)type; attr.size = sizeof(struct perf_event_attr); attr.sample_period = INIT_SAMPLE_PERIOD; // sample id, pid, tid, address and cpu attr.sample_type = PERF_SAMPLE_IDENTIFIER | PERF_SAMPLE_TID | PERF_SAMPLE_ADDR | PERF_SAMPLE_CPU; attr.disabled = 1; attr.exclude_kernel = 1; attr.precise_ip = 3; attr.wakeup_events = WAKEUP_EVENTS; // open perf event int fd = perf_event_open(&attr, pid, -1, -1, 0); if(fd < 0) { int ret = -errno; ERROR({}, ret, true, "perf_event_open(&attr, %d, -1, -1, 0) failed: ", pid); } // create ring buffer void* buffer = mmap(NULL, MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if(buffer == MAP_FAILED) { int ret = -errno; ERROR(close(fd), ret, true, "mmap(NULL, %u, PROT_READ | PROT_WRITE, MAP_SHARED, %d, 0)" " failed: ", MMAP_SIZE, fd); } // get id uint64_t id; int ret = ioctl(fd, PERF_EVENT_IOC_ID, &id); if(ret < 0) { int ret = -errno; ERROR({ munmap(buffer, MMAP_SIZE); close(fd); }, ret, true, "ioctl(%d, PERF_EVENT_IOC_ID, &id) failed: ", fd); } m_pid = pid; m_type = type; m_fd = fd; m_id = id; m_buffer = buffer; m_period = 0; return 0; } void Channel::unbind() { if(m_fd < 0) return; int ret = munmap(m_buffer, MMAP_SIZE); assert(ret == 0); ret = close(m_fd); assert(ret == 0); m_fd = -1; } int Channel::setPeriod(unsigned long period) { if(m_fd < 0) ERROR({}, -EINVAL, false, "this Channel has not bound yet"); if(period == m_period) return 0; int ret; // disable channel if(period == 0) { ret = ioctl(m_fd, PERF_EVENT_IOC_DISABLE, 0); if(ret < 0) { ret = -errno; ERROR({}, ret, true, "ioctl(%d, PERF_EVENT_IOC_DISABLE, 0) failed: ", m_fd); } m_period = 0; return 0; } // set new period ret = ioctl(m_fd, PERF_EVENT_IOC_PERIOD, &period); if(ret < 0) { ret = -errno; ERROR({}, ret, true, "ioctl(%d, PERF_EVENT_IOC_PERIOD, &(%lu)) failed: ", m_fd, period); } // if channel was disabled, enable it if(m_period == 0) { ret = ioctl(m_fd, PERF_EVENT_IOC_ENABLE, 0); if(ret < 0) { ret = -errno; ERROR({}, ret, true, "ioctl(%d, PERF_EVENT_IOC_ENABLE, 0) failed: ", m_fd); } } m_period = period; return 0; } // see man page for perf_event_open() struct perf_sample { struct perf_event_header header; uint64_t id; uint32_t pid, tid; uint64_t address; uint32_t cpu, ret; }; int Channel::readSample(Sample* sample) { if(m_fd < 0) ERROR({}, -EINVAL, false, "this Channel has not bound yet"); // the header auto* meta = (struct perf_event_mmap_page*)m_buffer; uint64_t tail = meta->data_tail; uint64_t head = meta->data_head; READ_MEMORY_BARRIER(); assert(tail <= head); if(tail == head) return -EAGAIN; bool available = false; while(tail < head) { // the data_head and data_tail never wrap, they are logical uint64_t position = tail % (PAGE_SIZE * RING_BUFFER_PAGES); auto* entry = (struct perf_sample*)((char*)m_buffer + PAGE_SIZE + position); tail += entry->header.size; // read the record if(entry->header.type == PERF_RECORD_SAMPLE && entry->id == m_id && #RED// this line is to filter the wrong pid caused by kernel bug#-RED entry->pid == m_pid) { sample->type = m_type; sample->cpu = entry->cpu; sample->pid = entry->pid; sample->tid = entry->tid; sample->address = entry->address; available = true; break; } } assert(tail <= head); // update data_tail to notify kernel write new data meta->data_tail = tail; return available ? 0 : -EAGAIN; } pid_t Channel::getPid() { return m_pid; } Channel::Type Channel::getType() { return m_type; } int Channel::getPerfFd() { return m_fd; } ---code 这个class Channel实现了完整的功能。可以写一个测试程序test_channel.cpp如下: +++code #include "channel.h" int main(int argc, char* argv[]) { unsigned long period; pid_t pid; if(argc != 3 || sscanf(argv[1], "%lu", &period) != 1 || sscanf(argv[2], "%d", &pid) != 1) { printf("USAGE: %s <period> <pid>\n", argv[0]); return 1; } Channel c; int ret = c.bind(pid, Channel::CHANNEL_STORE); if(ret) return ret; ret = c.setPeriod(period); if(ret) return ret; while(true) { Channel::Sample sample; ret = c.readSample(&sample); if(ret == -EAGAIN) { usleep(10000); continue; } else if(ret < 0) return ret; else printf("type: %x, cpu: %u, pid: %u, tid: %u, address: %lx\n", sample.type, sample.cpu, sample.pid, sample.tid, sample.address); } return 0; } ---code 编译命令: +++code g++ -std=gnu++11 channel.cpp test_channel.cpp -o test_channel ---code 使用命令: +++code ./test_channel 10000 <pid> ---code 就可以不停输出指定进程STORE指令的采样了。 2.png =======================阶段二:信道集ChannelSet======================== 有了单个信道之后,就需要封装一个集合,里面可以增删信道,还能在集合上做poll()操作,即获取新的采样。 这个ChannelSet在初始化时,需要指定集合监测哪些类型的事件。即传入一个Channel::Type的集合。假设初始化时指定了CHANNEL_LOAD和CHANNEL_STORE,那么以后每一个新加入集合的pid,都会新建两个信道。 channelset.h +++code #ifndef CHANNELSET_H #define CHANNELSET_H #include "common.h" #include "channel.h" #include <set> #include <vector> class ChannelSet { public: ChannelSet(); ~ChannelSet(); /* Initialize the ChannelSet. * types: set of Channel::Type to sample * RETURN: 0 if ok, or a negative error code */ int init(std::set<Channel::Type>& types); /* Uninitialize the ChannelSet. */ void deinit(); /* Add a process into the ChannelSet. * pid: the pid of the new process * RETURN: 0 if ok (either newly added or alreadly existed), or a negative error code * NOTE: Channel(s) will automatically be created for this process */ int add(pid_t pid); /* Remove a process out from the ChannelSet. * pid: the pid of the process * RETURN: 0 if ok (either actually removed or never existed), or a negative error code * NOTE: Channel(s) will automatically be destroyed for this process */ int remove(pid_t pid); /* Update the processes in this ChannelSet. * pids: the set of processes * RETURN: 0 if ok, or a negative error code * NOTE: For any process that is in this ChannelSet but not in <pids> will be removed, * while for any process that is not in this ChannelSet but in <pids> will be added. */ int update(std::set<pid_t>& pids); /* Set the period of all Channels. * period: the new period to sample * RETURN: 0 if ok, or a negative error code */ int setPeriod(unsigned long period); /* Poll samples from Channels. * timeout: the number of milliseconds to block. * -1 causes to block indefinitely until any sample is available, * while 0 causes to return immediately, even if no samples are available. * privdata: the user-defined argument passed to <func>, see below * on_sample: the callback function to handle with each sample * on_exit: the callback function to handle with exited process * RETURN: the count of samples handled, or a negative error code * NOTE: Designing in callback style is to reduce the overhead of data copy. */ ssize_t pollSamples(int timeout, void* privdata, void (*on_sample)(void* privdata, Channel::Sample* sample), void (*on_exit)(void* privdata, pid_t pid)); private: struct Entry { pid_t pid; // pid of this process Channel* channels; // Channels for this process bool operator <(const Entry& entry) const { return pid < entry.pid; } }; int createChannels(Channel** pchannels, pid_t pid); void destroyChannels(Channel* channels, ssize_t epoll_count = -1); private: std::vector<Channel::Type> m_types; // types to sample (of Channels for each process) std::set<Entry> m_entries; // set of processes and its Channels unsigned long m_period; // the sample_period of all Channels int m_epollfd; // the file descriptor from epoll_create() }; #endif ---code channelset.cpp +++code #include "channelset.h" #include <list> #include <sys/epoll.h> #define EPOLL_BATCH_SIZE 64 ChannelSet::ChannelSet() { m_epollfd = -1; } ChannelSet::~ChannelSet() { deinit(); } int ChannelSet::init(std::set<Channel::Type>& types) { if(m_epollfd >= 0) ERROR({}, -EINVAL, false, "this ChannelSet has been initialized already"); if(types.size() == 0) ERROR({}, -EINVAL, false, "param <types> is empty"); int fd = epoll_create(1); if(fd < 0) { int ret = -errno; ERROR({}, ret, true, "epoll_create(1) failed: "); } m_types.insert(m_types.begin(), types.begin(), types.end()); m_period = 0; m_epollfd = fd; return 0; } void ChannelSet::deinit() { if(m_epollfd < 0) return; m_types.clear(); for(auto it = m_entries.begin(); it != m_entries.end(); ++it) destroyChannels(it->channels); m_entries.clear(); close(m_epollfd); m_epollfd = -1; } int ChannelSet::createChannels(Channel** pchannels, pid_t pid) { size_t count = m_types.size(); auto* channels = new Channel[count]; for(size_t i = 0; i < count; i++) { Channel* channel = channels + i; Channel::Type type = m_types[i]; int ret = channel->bind(pid, type); if(ret < 0) ERROR(destroyChannels(channels, i), ret, false, "channels[%lu].bind(%d, %d) failed", i, pid, type); ret = channel->setPeriod(m_period); if(ret < 0) ERROR(destroyChannels(channels, i), ret, false, "channels[%lu].setPeriod(%lu) failed", i, m_period); // add channel to epoll struct epoll_event event; // if any sample available, EPOLLIN is sent, if process exits, EPOLLHUP is sent. event.events = EPOLLIN | EPOLLHUP; // we can get Channal after epoll_wait() event.data.ptr = channel; int fd = channel->getPerfFd(); ret = epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &event); if(ret) ERROR(destroyChannels(channels, i), ret, true, "epoll_ctl(%d, EPOLL_CTL_ADD, %d, &evt) failed: ", m_epollfd, fd); } (*pchannels) = channels; return 0; } void ChannelSet::destroyChannels(Channel* channels, ssize_t epoll_count) { // channels that added to epoll should be deleted size_t count = epoll_count >= 0 ? epoll_count : m_types.size(); for(size_t i = 0; i < count; i++) { Channel* channel = channels + i; int ret = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, channel->getPerfFd(), NULL); assert(ret == 0); } delete[] channels; } int ChannelSet::add(pid_t pid) { if(m_epollfd < 0) ERROR({}, -EINVAL, false, "this ChannelSet has not been initialized yet"); Entry entry; entry.pid = pid; auto it = m_entries.find(entry); // already existed if(it != m_entries.end()) return 0; int ret = createChannels(&(entry.channels), pid); if(ret < 0) ERROR({}, ret, false, "createChannels(&(entry.channels), %d) failed", pid); m_entries.insert(entry); return 0; } int ChannelSet::remove(pid_t pid) { if(m_epollfd < 0) ERROR({}, -EINVAL, false, "this ChannelSet has not been initialized yet"); Entry entry; entry.pid = pid; auto it = m_entries.find(entry); // not existed if(it == m_entries.end()) return 0; destroyChannels(it->channels); m_entries.erase(it); return 0; } int ChannelSet::update(std::set<pid_t>& pids) { if(m_epollfd < 0) ERROR({}, -EINVAL, false, "this ChannelSet has not been initialized yet"); // now we diff the two sets auto pid_it = pids.begin(), pid_end = pids.end(); auto entry_it = m_entries.begin(), entry_end = m_entries.end(); // to_adds = pids - m_entries.pid, to_removes = m_entries.pid - pids std::list<pid_t> to_adds, to_removes; while(pid_it != pid_end && entry_it != entry_end) { if((*pid_it) < entry_it->pid) { to_adds.push_back(*pid_it); ++pid_it; } else if((*pid_it) > entry_it->pid) { to_removes.push_back(entry_it->pid); ++entry_it; } else { ++pid_it; ++entry_it; } } to_adds.insert(to_adds.end(), pid_it, pid_end); for(; entry_it != entry_end; ++entry_it) to_removes.push_back(entry_it->pid); // remove pids that not in <pids> for(auto it = to_removes.begin(); it != to_removes.end(); ++it) { Entry fake; fake.pid = (*it); auto found = m_entries.find(fake); assert(found != m_entries.end()); destroyChannels(found->channels); m_entries.erase(found); } // add new pids in <pids> for(auto it = to_adds.begin(); it != to_adds.end(); ++it) { Entry entry; entry.pid = (*it); int ret = createChannels(&(entry.channels), entry.pid); if(ret < 0) ERROR({}, ret, false, "createChannels(&(entry.channels), %d) failed", entry.pid); m_entries.insert(entry); } return 0; } int ChannelSet::setPeriod(unsigned long period) { if(m_epollfd < 0) ERROR({}, -EINVAL, false, "this ChannelSet has not been initialized yet"); size_t count = m_types.size(); for(auto it = m_entries.begin(); it != m_entries.end(); ++it) { Channel* channels = it->channels; for(size_t i = 0; i < count; i++) { int ret = channels[i].setPeriod(period); if(ret < 0) ERROR({}, ret, false, "channels[%lu].setPeriod(%lu) failed", i, period); } } m_period = period; return 0; } ssize_t ChannelSet::pollSamples(int timeout, void* privdata, void (*on_sample)(void* privdata, Channel::Sample* sample), void (*on_exit)(void* privdata, pid_t pid)) { if(m_epollfd < 0) ERROR({}, -EINVAL, false, "this ChannelSet has not been initialized yet"); // poll available channels struct epoll_event events[EPOLL_BATCH_SIZE]; int ret = epoll_wait(m_epollfd, events, EPOLL_BATCH_SIZE, timeout); if(ret < 0) { ret = -errno; ERROR({}, ret, true, "epoll_wait(%d, events, %d, %d) failed: ", m_epollfd, EPOLL_BATCH_SIZE, timeout); } // count of active channel int channel_count = ret; // count of available samples ssize_t sample_count = 0; // exited processes std::set<pid_t> exit_pids; // for each active channel for(int i = 0; i < channel_count; i++) { auto reason = events[i].events; auto* channel = (Channel*)events[i].data.ptr; // process exits if(reason & EPOLLHUP) { exit_pids.insert(channel->getPid()); break; } // process has new samples assert(reason == EPOLLIN); // read all available samples while(true) { Channel::Sample sample; ret = channel->readSample(&sample); if(ret == -EAGAIN) break; if(ret < 0) ERROR({}, ret, false, "channel->readSample(&sample) failed"); if(on_sample) on_sample(privdata, &sample); sample_count++; } } for(auto it = exit_pids.begin(); it != exit_pids.end(); ++it) { Entry entry; entry.pid = (*it); auto found = m_entries.find(entry); assert(found != m_entries.end()); destroyChannels(found->channels); m_entries.erase(found); if(on_exit) on_exit(privdata, entry.pid); } return sample_count; } ---code 可以看到,进程是可以动态增删的。写一个test_channelset.cpp如下: +++code #include "channelset.h" void on_sample(void* privdata, Channel::Sample* sample) { printf("type: %x, cpu: %u, pid: %u, tid: %u, address: %lx\n", sample->type, sample->cpu, sample->pid, sample->tid, sample->address); } int main(int argc, char* argv[]) { unsigned long period; if(argc < 3 || sscanf(argv[1], "%lu", &period) != 1) { wrong_arguments: printf("USAGE: %s <period> <pid1> <pid2> ...\n", argv[0]); return 1; } std::set<pid_t> pids; for(int i = 2; i < argc; i++) { pid_t pid; if(sscanf(argv[i], "%d", &pid) != 1) goto wrong_arguments; pids.insert(pid); } ChannelSet cs; std::set<Channel::Type> types; types.insert(Channel::CHANNEL_LOAD); types.insert(Channel::CHANNEL_STORE); int ret = cs.init(types); if(ret) return ret; ret = cs.setPeriod(period); if(ret) return ret; ret = cs.update(pids); if(ret) return ret; size_t total = 0; while(true) { ssize_t ret = cs.pollSamples(1000, NULL, on_sample); if(ret < 0) return (int)ret; total += ret; printf("count: %ld, total: %lu\n", ret, total); } return 0; } ---code 编译命令: +++code g++ -std=gnu++11 channel.cpp channelset.cpp test_channelset.cpp -o test_channelset ---code 使用命令: +++code ./test_channelset 10000 <pid1> <pid2> <pid3> ---code 就可以不停输出多个指定进程LOAD和STORE指令的采样了。 3.png