算法实现/树/B+树
在计算机科学中,B+ 树 是一种树形数据结构。它以一种允许高效插入和删除元素的方式表示排序数据。它是一个动态的、多级索引,对每个节点中的键数有最大和最小界限。
B+ 树是 B 树的一种变体。在 B+ 树中,与 B 树不同的是,所有数据都保存在叶子节点中。内部节点只包含键和树指针。所有叶子节点都在同一最低级别。叶子节点也作为链表连接在一起,以方便范围查询。
记录中键的最大数量称为 B+ 树的阶数。
每个记录中键的最小数量是键的最大数量的 1/2。例如,如果 B+ 树的阶数为 n,则每个节点(根节点除外)必须有 n/2 到 n 个键。
可以使用 B+ 树索引的键的数量是树的阶数及其高度的函数。
对于一个高度为 h 的 n 阶 B+ 树
- 键的最大数量是
- 键的最小数量是
B+ 树首次在论文“Rudolf Bayer, Edward M. McCreight: Organization and Maintenance of Large Ordered Indices. Acta Informatica 1: 173-189 (1972)” 中描述。
小心:以下代码示例没有实现正确的 B+ 树。 虽然它们与 B+ 树非常相似,但它们没有满足 B+ 树标准(正如作者在一些注释中承认的那样)。
此代码片段已在 32 位 x86 计算机上的 Linux 上进行测试。尚未实现键的删除。这可以通过一种懒惰的方式很容易地完成,其摊销成本为 O(log n),方法是在每次删除的键数量与未删除的键数量相同时从头开始重建树。重建可以在 O(n) 时间内完成,因此其摊销成本仅为 O(1)。但是,这种方法不适合实时系统。
该实现使用 Boost 库来进行编译时断言和高效的内存分配。后者可以使用 new 运算符
代替,这会导致一些性能损失。
/* Copyright (C) 2006 David Garcia
* You may use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of this software subject to the following conditions:
* THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#if !defined BPLUSTREE_HPP_227824
#define BPLUSTREE_HPP_227824
// This is required for glibc to define std::posix_memalign
#if !defined(_XOPEN_SOURCE) || (_XOPEN_SOURCE < 600)
#define _XOPEN_SOURCE 600
#endif
#include <stdlib.h>
#include <assert.h>
#include <boost/static_assert.hpp>
#include <boost/pool/object_pool.hpp>
// DEBUG
#include <iostream>
using std::cout;
using std::endl;
#if defined DEBUG_ASSERT
# warning "DEBUG_ASSERT overloaded"
#else
# if defined DEBUG
# define DEBUG_ASSERT(expr) assert(expr)
# else
# define DEBUG_ASSERT(expr)
# endif
#endif
template <typename KEY, typename VALUE, unsigned N, unsigned M,
unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0,
unsigned NODE_ALIGNMENT= 64>
class BPlusTree
{
public:
// N must be greater than two to make the split of
// two inner nodes sensible.
BOOST_STATIC_ASSERT(N>2);
// Leaf nodes must be able to hold at least one element
BOOST_STATIC_ASSERT(M>0);
// Builds a new empty tree.
BPlusTree()
: depth(0),
root(new_leaf_node())
{
// DEBUG
// cout << "sizeof(LeafNode)==" << sizeof(LeafNode) << endl;
// cout << "sizeof(InnerNode)==" << sizeof(InnerNode) << endl;
}
~BPlusTree() {
// Empty. Memory deallocation is done automatically
// when innerPool and leafPool are destroyed.
}
// Inserts a pair (key, value). If there is a previous pair with
// the same key, the old value is overwritten with the new one.
void insert(KEY key, VALUE value) {
InsertionResult result;
bool was_split;
if( depth == 0 ) {
// The root is a leaf node
DEBUG_ASSERT( *reinterpret_cast<NodeType*>(root) ==
NODE_LEAF);
was_split= leaf_insert(reinterpret_cast<LeafNode*>
(root), key, value, &result);
} else {
// The root is an inner node
DEBUG_ASSERT( *reinterpret_cast<NodeType*>
(root) == NODE_INNER );
was_split= inner_insert(reinterpret_cast<InnerNode*>
(root), depth, key, value, &result);
}
if( was_split ) {
// The old root was splitted in two parts.
// We have to create a new root pointing to them
depth++;
root= new_inner_node();
InnerNode* rootProxy=
reinterpret_cast<InnerNode*>(root);
rootProxy->num_keys= 1;
rootProxy->keys[0]= result.key;
rootProxy->children[0]= result.left;
rootProxy->children[1]= result.right;
}
}
// Looks for the given key. If it is not found, it returns false,
// if it is found, it returns true and copies the associated value
// unless the pointer is null.
bool find(const KEY& key, VALUE* value= 0) const {
InnerNode* inner;
register void* node= root;
register unsigned d= depth, index;
while( d-- != 0 ) {
inner= reinterpret_cast<InnerNode*>(node);
DEBUG_ASSERT( inner->type == NODE_INNER );
index= inner_position_for(key, inner->keys,
inner->num_keys);
node= inner->children[index];
}
LeafNode* leaf= reinterpret_cast<LeafNode*>(node);
DEBUG_ASSERT( leaf->type == NODE_LEAF );
index= leaf_position_for(key, leaf->keys, leaf->num_keys);
if( leaf->keys[index] == key ) {
if( value != 0 ) {
*value= leaf->values[index];
}
return true;
} else {
return false;
}
}
// Returns the size of an inner node
// It is useful when optimizing performance with cache alignment.
unsigned sizeof_inner_node() const {
return sizeof(InnerNode);
}
// Returns the size of a leaf node.
// It is useful when optimizing performance with cache alignment.
unsigned sizeof_leaf_node() const {
return sizeof(LeafNode);
}
private:
// Used when debugging
enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE};
// Leaf nodes store pairs of keys and values.
struct LeafNode {
#ifdef DEBUG
LeafNode() : type(NODE_LEAF), num_keys(0) {}
const NodeType type;
#else
LeafNode() : num_keys(0) {}
#endif
unsigned num_keys;
KEY keys[M];
VALUE values[M];
unsigned char _pad[LEAF_NODE_PADDING];
};
// Inner nodes store pointers to other nodes interleaved with keys.
struct InnerNode {
#ifdef DEBUG
InnerNode() : type(NODE_INNER), num_keys(0) {}
const NodeType type;
#else
InnerNode() : num_keys(0) {}
#endif
unsigned num_keys;
KEY keys[N];
void* children[N+1];
unsigned char _pad[INNER_NODE_PADDING];
};
// Custom allocator that returns aligned blocks of memory
template <unsigned ALIGNMENT>
struct AlignedMemoryAllocator {
typedef std::size_t size_type;
typedef std::ptrdiff_t difference_type;
static char* malloc(const size_type bytes)
{
void* result;
if( posix_memalign(&result, ALIGNMENT, bytes) != 0 ) {
result= 0;
}
// Alternative: result= std::malloc(bytes);
return reinterpret_cast<char*>(result);
}
static void free(char* const block)
{ std::free(block); }
};
// Returns a pointer to a fresh leaf node.
LeafNode* new_leaf_node() {
LeafNode* result;
//result= new LeafNode();
result= leafPool.construct();
//cout << "New LeafNode at " << result << endl;
return result;
}
// Frees a leaf node previously allocated with new_leaf_node()
void delete_leaf_node(LeafNode* node) {
DEBUG_ASSERT( node->type == NODE_LEAF );
//cout << "Deleting LeafNode at " << node << endl;
// Alternatively: delete node;
leafPool.destroy(node);
}
// Returns a pointer to a fresh inner node.
InnerNode* new_inner_node() {
InnerNode* result;
// Alternatively: result= new InnerNode();
result= innerPool.construct();
//cout << "New InnerNode at " << result << endl;
return result;
}
// Frees an inner node previously allocated with new_inner_node()
void delete_inner_node(InnerNode* node) {
DEBUG_ASSERT( node->type == NODE_INNER );
//cout << "Deleting InnerNode at " << node << endl;
// Alternatively: delete node;
innerPool.destroy(node);
}
// Data type returned by the private insertion methods.
struct InsertionResult {
KEY key;
void* left;
void* right;
};
// Returns the position where 'key' should be inserted in a leaf node
// that has the given keys.
static unsigned leaf_position_for(const KEY& key, const KEY* keys,
unsigned num_keys) {
// Simple linear search. Faster for small values of N or M
unsigned k= 0;
while((k < num_keys) && (keys[k]<key)) {
++k;
}
return k;
/*
// Binary search. It is faster when N or M is > 100,
// but the cost of the division renders it useless
// for smaller values of N or M.
XXX--- needs to be re-checked because the linear search
has changed since the last update to the following ---XXX
int left= -1, right= num_keys, middle;
while( right -left > 1 ) {
middle= (left+right)/2;
if( keys[middle] < key) {
left= middle;
} else {
right= middle;
}
}
//assert( right == k );
return unsigned(right);
*/
}
// Returns the position where 'key' should be inserted in an inner node
// that has the given keys.
static unsigned inner_position_for(const KEY& key, const KEY* keys,
unsigned num_keys) {
// Simple linear search. Faster for small values of N or M
unsigned k= 0;
while((k < num_keys) && ((keys[k]<key) || (keys[k]==key))) {
++k;
}
return k;
// Binary search is faster when N or M is > 100,
// but the cost of the division renders it useless
// for smaller values of N or M.
}
bool leaf_insert(LeafNode* node, KEY& key,
VALUE& value, InsertionResult* result) {
DEBUG_ASSERT( node->type == NODE_LEAF );
assert( node->num_keys <= M );
bool was_split= false;
// Simple linear search
unsigned i= leaf_position_for(key, node->keys, node->num_keys);
if( node->num_keys == M ) {
// The node was full. We must split it
unsigned treshold= (M+1)/2;
LeafNode* new_sibling= new_leaf_node();
new_sibling->num_keys= node->num_keys -treshold;
for(unsigned j=0; j < new_sibling->num_keys; ++j) {
new_sibling->keys[j]= node->keys[treshold+j];
new_sibling->values[j]=
node->values[treshold+j];
}
node->num_keys= treshold;
if( i < treshold ) {
// Inserted element goes to left sibling
leaf_insert_nonfull(node, key, value, i);
} else {
// Inserted element goes to right sibling
leaf_insert_nonfull(new_sibling, key, value,
i-treshold);
}
// Notify the parent about the split
was_split= true;
result->key= new_sibling->keys[0];
result->left= node;
result->right= new_sibling;
} else {
// The node was not full
leaf_insert_nonfull(node, key, value, i);
}
return was_split;
}
static void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value,
unsigned index) {
DEBUG_ASSERT( node->type == NODE_LEAF );
assert( node->num_keys < M );
assert( index <= M );
assert( index <= node->num_keys );
if( (index < M) && (node->num_keys!=0) && (node->keys[index] == key) ) {
// We are inserting a duplicate value.
// Simply overwrite the old one
node->values[index]= value;
} else {
// The key we are inserting is unique
for(unsigned i=node->num_keys; i > index; --i) {
node->keys[i]= node->keys[i-1];
node->values[i]= node->values[i-1];
}
node->num_keys++;
node->keys[index]= key;
node->values[index]= value;
}
}
bool inner_insert(InnerNode* node, unsigned depth, KEY& key,
VALUE& value, InsertionResult* result) {
DEBUG_ASSERT( node->type == NODE_INNER );
assert( depth != 0 );
// Early split if node is full.
// This is not the canonical algorithm for B+ trees,
// but it is simpler and does not break the definition.
bool was_split= false;
if( node->num_keys == N ) {
// Split
unsigned treshold= (N+1)/2;
InnerNode* new_sibling= new_inner_node();
new_sibling->num_keys= node->num_keys -treshold;
for(unsigned i=0; i < new_sibling->num_keys; ++i) {
new_sibling->keys[i]= node->keys[treshold+i];
new_sibling->children[i]=
node->children[treshold+i];
}
new_sibling->children[new_sibling->num_keys]=
node->children[node->num_keys];
node->num_keys= treshold-1;
// Set up the return variable
was_split= true;
result->key= node->keys[treshold-1];
result->left= node;
result->right= new_sibling;
// Now insert in the appropriate sibling
if( key < result->key ) {
inner_insert_nonfull(node, depth, key, value);
} else {
inner_insert_nonfull(new_sibling, depth, key,
value);
}
} else {
// No split
inner_insert_nonfull(node, depth, key, value);
}
return was_split;
}
void inner_insert_nonfull(InnerNode* node, unsigned depth, KEY& key,
VALUE& value) {
DEBUG_ASSERT( node->type == NODE_INNER );
assert( node->num_keys < N );
assert( depth != 0 );
// Simple linear search
unsigned index= inner_position_for(key, node->keys,
node->num_keys);
InsertionResult result;
bool was_split;
if( depth-1 == 0 ) {
// The children are leaf nodes
for(unsigned kk=0; kk < node->num_keys+1; ++kk) {
DEBUG_ASSERT( *reinterpret_cast<NodeType*>
(node->children[kk]) == NODE_LEAF );
}
was_split= leaf_insert(reinterpret_cast<LeafNode*>
(node->children[index]), key, value, &result);
} else {
// The children are inner nodes
for(unsigned kk=0; kk < node->num_keys+1; ++kk) {
DEBUG_ASSERT( *reinterpret_cast<NodeType*>
(node->children[kk]) == NODE_INNER );
}
InnerNode* child= reinterpret_cast<InnerNode*>
(node->children[index]);
was_split= inner_insert( child, depth-1, key, value,
&result);
}
if( was_split ) {
if( index == node->num_keys ) {
// Insertion at the rightmost key
node->keys[index]= result.key;
node->children[index]= result.left;
node->children[index+1]= result.right;
node->num_keys++;
} else {
// Insertion not at the rightmost key
node->children[node->num_keys+1]=
node->children[node->num_keys];
for(unsigned i=node->num_keys; i!=index; --i) {
node->children[i]= node->children[i-1];
node->keys[i]= node->keys[i-1];
}
node->children[index]= result.left;
node->children[index+1]= result.right;
node->keys[index]= result.key;
node->num_keys++;
}
} // else the current node is not affected
}
typedef AlignedMemoryAllocator<NODE_ALIGNMENT> AlignedAllocator;
// Node memory allocators. IMPORTANT NOTE: they must be declared
// before the root to make sure that they are properly initialised
// before being used to allocate any node.
boost::object_pool<InnerNode, AlignedAllocator> innerPool;
boost::object_pool<LeafNode, AlignedAllocator> leafPool;
// Depth of the tree. A tree of depth 0 only has a leaf node.
unsigned depth;
// Pointer to the root node. It may be a leaf or an inner node, but
// it is never null.
void* root;
};
#endif // !defined BPLUSTREE_HPP_227824
在将其转换为 Java 代码后,我意识到以下代码中执行的 splitInsertInner() 违反了 B+ 树的定义。例如,如果 N = 4,那么在拆分之后,我们最终得到 3 个节点,根节点包含 1 个键,原始节点包含 1 个键,而兄弟节点包含 3 个键。因此它(原始节点)至少违反了内部节点中至少半满的 B+ 树规则。看起来我们仍然需要规范算法才能将其纠正。这里没有捷径。因此,此算法不是严格的 B+ 树,但它可以工作。无论如何,我将上面的 C++ 代码转换为下面的 Java 代码。
它实际上不起作用,因为不能通过从新创建的具体祖先(如 Object)数组进行强制转换来创建泛型数组。
例如。T[] a = new T[10] 不起作用。请参见 LNode 的字段声明。
/**
* B+ Tree
* If you understand B+ or B Tree better, M & N don't need to be the same
* Here is an example of M=N=4, with 12 keys
*
* 5
* / \
* 3 7 9
* / \ / | \
* 1 2 3 4 5 6 7 8 9 10 11 12
* @author jwang01
* @version 1.0.0 created on May 19, 2006
* edited by Spoon! 2008
* edited by Mistro 2010
*/
public class BxTree<Key extends Comparable<? super Key>, Value>
{
/** Pointer to the root node. It may be a leaf or an inner node, but it is never null. */
private Node root;
/** the maximum number of keys in the leaf node, M must be > 0 */
private final int M;
/** the maximum number of keys in inner node, the number of pointer is N+1, N must be > 2 */
private final int N;
/** Create a new empty tree. */
public BxTree(int n) {
this(n, n);
}
public BxTree(int m, int n) {
M = m;
N = n;
root = new LNode();
}
public void insert(Key key, Value value) {
System.out.println("insert key=" + key);
Split result = root.insert(key, value);
if (result != null) {
// The old root was split into two parts.
// We have to create a new root pointing to them
INode _root = new INode();
_root.num = 1;
_root.keys[0] = result.key;
_root.children[0] = result.left;
_root.children[1] = result.right;
root = _root;
}
}
/**
* Looks for the given key. If it is not found, it returns null.
* If it is found, it returns the associated value.
*/
public Value find(Key key) {
Node node = root;
while (node instanceof BxTree.INode) { // need to traverse down to the leaf
INode inner = (INode) node;
int idx = inner.getLoc(key);
node = inner.children[idx];
}
//We are @ leaf after while loop
LNode leaf = (LNode) node;
int idx = leaf.getLoc(key);
if (idx < leaf.num && leaf.keys[idx].equals(key)) {
return leaf.values[idx];
} else {
return null;
}
}
public void dump() {
root.dump();
}
abstract class Node {
protected int num; //number of keys
protected Key[] keys;
abstract public int getLoc(Key key);
// returns null if no split, otherwise returns split info
abstract public Split insert(Key key, Value value);
abstract public void dump();
}
class LNode extends Node {
// In some sense, the following casts are almost always illegal
// (if Value was replaced with a real type other than Object,
// the cast would fail); but they make our code simpler
// by allowing us to pretend we have arrays of certain types.
// They work because type erasure will erase the type variables.
// It will break if we return it and other people try to use it.
final Value[] values = (Value[]) new Object[M];
{ keys = (Key[]) new Comparable[M]; }
/**
* Returns the position where 'key' should be inserted in a leaf node
* that has the given keys.
*/
public int getLoc(Key key) {
// Simple linear search. Faster for small values of N or M, binary search would be faster for larger M / N
for (int i = 0; i < num; i++) {
if (keys[i].compareTo(key) >= 0) {
return i;
}
}
return num;
}
public Split insert(Key key, Value value) {
// Simple linear search
int i = getLoc(key);
if (this.num == M) { // The node was full. We must split it
int mid = (M+1)/2;
int sNum = this.num - mid;
LNode sibling = new LNode();
sibling.num = sNum;
System.arraycopy(this.keys, mid, sibling.keys, 0, sNum);
System.arraycopy(this.values, mid, sibling.values, 0, sNum);
this.num = mid;
if (i < mid) {
// Inserted element goes to left sibling
this.insertNonfull(key, value, i);
} else {
// Inserted element goes to right sibling
sibling.insertNonfull(key, value, i-mid);
}
// Notify the parent about the split
Split result = new Split(sibling.keys[0], //make the right's key >= result.key
this,
sibling);
return result;
} else {
// The node was not full
this.insertNonfull(key, value, i);
return null;
}
}
private void insertNonfull(Key key, Value value, int idx) {
//if (idx < M && keys[idx].equals(key)) {
if (idx < num && keys[idx].equals(key)) {
// We are inserting a duplicate value, simply overwrite the old one
values[idx] = value;
} else {
// The key we are inserting is unique
System.arraycopy(keys, idx, keys, idx+1, num-idx);
System.arraycopy(values, idx, values, idx+1, num-idx);
keys[idx] = key;
values[idx] = value;
num++;
}
}
public void dump() {
System.out.println("lNode h==0");
for (int i = 0; i < num; i++){
System.out.println(keys[i]);
}
}
}
class INode extends Node {
final Node[] children = new BxTree.Node[N+1];
{ keys = (Key[]) new Comparable[N]; }
/**
* Returns the position where 'key' should be inserted in an inner node
* that has the given keys.
*/
public int getLoc(Key key) {
// Simple linear search. Faster for small values of N or M
for (int i = 0; i < num; i++) {
if (keys[i].compareTo(key) > 0) {
return i;
}
}
return num;
// Binary search is faster when N or M is big,
}
public Split insert(Key key, Value value) {
/* Early split if node is full.
* This is not the canonical algorithm for B+ trees,
* but it is simpler and it does break the definition
* which might result in immature split, which might not be desired in database
* because additional split lead to tree's height increase by 1, thus the number of disk read
* so first search to the leaf, and split from bottom up is the correct approach.
*/
if (this.num == N) { // Split
int mid = (N+1)/2;
int sNum = this.num - mid;
INode sibling = new INode();
sibling.num = sNum;
System.arraycopy(this.keys, mid, sibling.keys, 0, sNum);
System.arraycopy(this.children, mid, sibling.children, 0, sNum+1);
this.num = mid-1;//this is important, so the middle one elevate to next depth(height), inner node's key don't repeat itself
// Set up the return variable
Split result = new Split(this.keys[mid-1],
this,
sibling);
// Now insert in the appropriate sibling
if (key.compareTo(result.key) < 0) {
this.insertNonfull(key, value);
} else {
sibling.insertNonfull(key, value);
}
return result;
} else {// No split
this.insertNonfull(key, value);
return null;
}
}
private void insertNonfull(Key key, Value value) {
// Simple linear search
int idx = getLoc(key);
Split result = children[idx].insert(key, value);
if (result != null) {
if (idx == num) {
// Insertion at the rightmost key
keys[idx] = result.key;
children[idx] = result.left;
children[idx+1] = result.right;
num++;
} else {
// Insertion not at the rightmost key
//shift i>idx to the right
System.arraycopy(keys, idx, keys, idx+1, num-idx);
System.arraycopy(children, idx, children, idx+1, num-idx+1);
children[idx] = result.left;
children[idx+1] = result.right;
keys[idx] = result.key;
num++;
}
} // else the current node is not affected
}
/**
* This one only dump integer key
*/
public void dump() {
System.out.println("iNode h==?");
for (int i = 0; i < num; i++){
children[i].dump();
System.out.print('>');
System.out.println(keys[i]);
}
children[num].dump();
}
}
class Split {
public final Key key;
public final Node left;
public final Node right;
public Split(Key k, Node l, Node r) {
key = k; left = l; right = r;
}
}
}