获取虚拟页所在的NUMA节点
在涉及到NUMA balancing的需求中,通常都需要获知一个虚拟页所在的NUMA节点,从而做页迁移的决策。
单单就获取NUMA节点而言,用move_pages()即可。move_pages()虽然主要用于页迁移,但是还有附带的功能,即获取所在NUMA node。
1.jpg
也就是说,只要把nodes数组设为NULL,move_pages()就会把pages数组中各个页所在的节点号放到status数组中。
如果我们要实现一个类,可以通过虚地址获取节点,那么为了性能,需要考虑如下两点:
#OL
#LI需要有缓存机制,即对于已经通过move_pages()获取过节点的页,下次访问直接从缓存中获得;#-LI
#LI调用move_pages(),尽量使用批量处理(batch),即多个(比如256)个页的信息通过一次move_pages()调用获得。#-LI
#-OL
既然引入了缓存机制,那么还需要有“失效”机制,即用户可以通知该类把某一范围内的缓存信息丢弃掉。比如用户通过move_pages()做了页迁移之后,就可以通过该接口迫使该类重新获取信息。
这个功能很简单。但是为了避免以后重复造轮子,我就贴出一个实现,以后可以直接复用。common.h可从《#HREF"../基于perf的内存访问采样/index.html"#-HREF1基于perf的内存访问采样#-HREF2》中获得。
pagenode.h
+++code
#ifndef PAGENODE_H
#define PAGENODE_H

#include "common.h"

#include <set>
#include <unistd.h>

class PageNode
{
public:
    PageNode();

    ~PageNode();

    /* Initialized the PageNode to bind to a process.
     *      pid: the PID of target process
     * RETURN: 0 if ok, or a negative error code
     */
    int bind(pid_t pid);

    /* Uninitialized the PageNode. 
     */
    void unbind();

    /* Get the NUMA node id where the page resides.
     *      address: the virtual address of the page
     *      force: if the cached node is '-1', then re-fetch the node information
     * RETURN: the node id, or 255 if page is unavailable, or a negetive error code 
     */
    int where(uint64_t address, bool force = false);

    /* Invalidate the cached information.
     *      start: the start address
     *      end: the end address
     * RETURN: 0 if ok, or a negative error
     * NOTE: the cached information in range [start, end) is cleared 
     */
    int invalidate(uint64_t start = 0, uint64_t end = 0xffffffffffffffffUL);

private:

    struct Segment
    {
        uint64_t start;
        uint8_t* info;

        bool operator <(const Segment& segment) const
        {
            return start < segment.start;
        }
    };

    int createInfo(Segment* segment);

    int getNode(uint64_t address);

private:
    pid_t m_pid;
    std::set<Segment> m_segments;
};

#endif
---code
pagenode.cpp
+++code
#include "pagenode.h"

#include <signal.h>
#include <numaif.h>

#define PAGE_SIZE                       4096

// one segment manage 128MB virtual space
#define SEGMENT_SIZE                    (128UL << 20)
#define ALIGN_TO_SEGMENT_START(addr)    ((addr) & (~(SEGMENT_SIZE - 1)))
// a 4K page is managed by a info of 4 bit, 128MB needs 16384B to manage
#define SEGMENT_INFO_SIZE               (SEGMENT_SIZE / PAGE_SIZE / 2)

// batch size to call move_pages()
#define GET_NUMA_NODE_BATCH_SIZE        512
// max count of numa node
#define MAX_NUMA_NODE                   12

PageNode::PageNode()
{
    m_pid = -1;
}

PageNode::~PageNode()
{
    unbind();
}

int PageNode::bind(pid_t pid)
{
    if(m_pid >= 0)
        ERROR({}, -EINVAL, false, "this PageNode has bound already");
    int ret = kill(pid, 0);
    if(ret)
        ERROR({}, -ESRCH, false, "no such process whose pid is %d", pid);
    m_pid = pid;
    return 0;
}

void PageNode::unbind()
{
    if(m_pid < 0)
        return;
    for(auto it = m_segments.begin(); it != m_segments.end(); ++it)
        delete it->info;
    m_segments.clear();
    m_pid = -1;
}

static void setInfo(uint8_t* info, size_t index, int status)
{
    assert((status & 0xf0) == 0);
    size_t byte_index = index / 2;
    int which = index % 2;
    uint8_t& byte = info[byte_index];
    // each info is 4 bits
    if(which == 0)
    {
        byte &= 0xf;
        byte |= status;
    }
    else
    {
        byte &= 0xf0;
        byte |= status << 4;
    }
}

static int getInfo(uint8_t* info, size_t index)
{
    size_t byte_index = index / 2;
    int which = index % 2;
    uint8_t byte = info[byte_index];
    if(which == 0)
        return byte & 0xf;
    else
        return (byte >> 4) & 0xf;
}

// init the <info> field of Segment
int PageNode::createInfo(Segment* segment)
{
    segment->info = new uint8_t[SEGMENT_INFO_SIZE];
    memset(segment->info, 0xff, SEGMENT_INFO_SIZE);
    size_t info_count = 0;
    void* address[GET_NUMA_NODE_BATCH_SIZE];
    int status[GET_NUMA_NODE_BATCH_SIZE];
    size_t batch_size = 0;
    uint64_t addr = segment->start;
    uint64_t end = addr + SEGMENT_SIZE;
    while(addr < end)
    {
        while(addr < end)
        {
            address[batch_size++] = (void*)addr;
            addr += PAGE_SIZE;
            if(batch_size == GET_NUMA_NODE_BATCH_SIZE)
                break;
        }
        int ret = move_pages(m_pid, batch_size, address, NULL, status, MPOL_MF_MOVE);
        if(ret)
        {
            ret = -errno;
            ERROR({}, ret, true,
                "move_pages(%d, %lu, address, NULL, status, MPOL_MF_MOVE) failed: ",
                    m_pid, batch_size);
        }
        for(size_t i = 0; i < batch_size; i++)
        {
            int s = status[i];
            if(s < 0 || s >= MAX_NUMA_NODE)
                s = 0xf;
            setInfo(segment->info, info_count, s);
            info_count++;
        }
        batch_size = 0;
    }
    assert(info_count == SEGMENT_INFO_SIZE * 2);
    return 0;
}

int PageNode::getNode(uint64_t address)
{
    void* addr = (void*)address;
    int status;
    int ret = move_pages(m_pid, 1, &addr, NULL, &status, MPOL_MF_MOVE);
    if(ret)
    {
        ret = -errno;
        ERROR({}, ret, true,
            "move_pages(%d, 1, &addr, NULL, &status, MPOL_MF_MOVE) failed: ", m_pid);
    }
    if(status < 0 || status >= MAX_NUMA_NODE)
        status = 0xf;
    return status;
}

int PageNode::where(uint64_t address, bool force)
{
    if(m_pid < 0)
        ERROR({}, -EINVAL, false, "this PageNode has not been bound yet");
    Segment segment;
    // search segment by <start>
    segment.start = ALIGN_TO_SEGMENT_START(address);
    auto it = m_segments.find(segment);
    // if already in set, then get it
    if(it != m_segments.end())
        segment.info = it->info;
    // otherwise build the segment and insert it
    else
    {
        int ret = createInfo(&segment);
        if(ret)
            ERROR({}, ret, false, "createInfo(&segment) failed");
        m_segments.insert(segment);
    }
    assert(address >= segment.start);
    // index of byte of the info structure
    size_t index = (address - segment.start) / PAGE_SIZE;
    int status = getInfo(segment.info, index);
    if(status != 0xf)
        return status;
    if(!force)
        return 255;
    // get the current node of this page
    status = getNode(address);
    setInfo(segment.info, index, status);
    if(status == 0xf)
        return 255;
    return status;
}

int PageNode::invalidate(uint64_t start, uint64_t end)
{
    if(m_pid < 0)
        ERROR({}, -EINVAL, false, "this PageNode has not been bound yet");
    if(m_segments.size() == 0)
        return 0;
    Segment fake;
    fake.start = start;
    auto it = m_segments.lower_bound(fake);
    if(it != m_segments.begin())
    {
        auto prev = it;
        --prev;
        if(prev->start + SEGMENT_SIZE > start)
            it = prev;
    }
    assert(start < it->start + SEGMENT_SIZE);
    while(it != m_segments.end())
    {
        if(it->start >= end)
            break;
        delete it->info;
        m_segments.erase(it++);
    }
    return 0;
}
---code
可以写一个测试用例test.cpp,其中的Channel依赖《#HREF"../基于perf的内存访问采样/index.html"#-HREF1基于perf的内存访问采样#-HREF2》中的channel.h和channel.cpp:
+++code
#include "channel.h"
#include "pagenode.h"

#include <time.h>

int main(int argc, char* argv[])
{
    unsigned long period;
    pid_t pid;
    if(argc != 3 ||
        sscanf(argv[1], "%lu", &period) != 1 ||
        sscanf(argv[2], "%d", &pid) != 1)
    {
        printf("USAGE: %s <period> <pid>\n", argv[0]);
        return 1;
    }
    Channel c;
    int ret = c.bind(pid, Channel::CHANNEL_STORE);
    if(ret)
        return ret;
    ret = c.setPeriod(period);
    if(ret)
        return ret;
    PageNode pn;
    ret = pn.bind(pid);
    if(ret)
        return ret;
    int last_time = time(NULL);
    while(true)
    {
        Channel::Sample sample;
        ret = c.readSample(&sample);
        if(ret == -EAGAIN)
        {
            usleep(10000);
            continue;
        }
        else if(ret < 0)
            return ret;
        int node = pn.where(sample.address, true);
        printf("type: %x, cpu: %u, pid: %u, tid: %u, address: %lx, node: %d\n",
            sample.type, sample.cpu, sample.pid, sample.tid, sample.address, node);
        int now = time(NULL);
        if(now - last_time > 10)
        {
            pn.invalidate();
            last_time = now;
        }
    }
    return 0;
}
---code
可以看出,代码中每10秒使所有缓存失效。记得编译时加上-lnuma:
+++code
g++ -std=gnu++11 channel.cpp pagenode.cpp test.cpp -o test -lnuma -O2 -Wall
---code
假设要监测的进程pid是16351,那么这样启动:
+++code
./test 100000 16351
---code
2.png