Overall
开始看leveldb的代码。leveldb整体代码量不多,所以读起来应该不会很困难。并且有比较完善的leveldb handbook。读得时候要多加考虑他的设计因素。
这次从doc中的index看一下整体的概念。文档在这里
#include <cassert>
#include "leveldb/db.h"
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
assert(status.ok());
...
打开leveldb
leveldb中很多可能遇到错误的函数的返回值都是status,这样我们就可以检查status是否为ok,或者输出他错误的信息。这里的API设计,用Status去处理返回信息。
leveldb::Status s = ...;
if (!s.ok()) cerr << s.ToString() << endl;
关闭数据库的话只要把那个指针删掉,析构他就可以
leveldb提供了三种方法来操作数据。分别是Put,Delete和Get
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);
为了防止部分更新失败。leveldb提供了WriteBatch,用来原子的更新一系列的值
#include "leveldb/write_batch.h"
...
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) {
leveldb::WriteBatch batch;
batch.Delete(key1);
batch.Put(key2, value);
s = db->Write(leveldb::WriteOptions(), &batch);
}
除了原子性,write batch还用来加速批量的更新操作。
leveldb中默认是异步的写入。我们可以通过修改选项来变成同步的写入
leveldb::WriteOptions write_options;
write_options.sync = true;
db->Put(write_options, ...);
相当于是写入的时候调用fsync
对于process failure来说,数据不会丢失,因为这时候数据存在内核中等待写入。而对于系统的failure,异步写入就会导致近几次的写入失效。
同步操作的开销可以通过write batch中的每个写操作均摊来减小
leveldb内部提供了同步策略。所以线程级的并发,比如不同线程并发的读写leveldb,是不需要额外的同步手段的。
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
cout << it->key().ToString() << ": " << it->value().ToString() << endl;
}
assert(it->status().ok()); // Check for any errors found during the scan
delete it;
for (it->Seek(start);
it->Valid() && it->key().ToString() < limit;
it->Next()) {
...
}
leveldb中的迭代器。第一个例子是扫描整个数据库,第二个例子是扫描start到limit之间的数据
leveldb提供了snapshot读
leveldb::ReadOptions options;
options.snapshot = db->GetSnapshot();
... apply some updates to db ...
leveldb::Iterator* iter = db->NewIterator(options);
... read using iter to view the state when the snapshot was created ...
delete iter;
db->ReleaseSnapshot(options.snapshot);
可以看到他是传入给了option。所以大概可以猜到他是通过一些version信息来进行snapshot读
leveldb中通过key和value返回的类型是Slice。这是一个结构体,包含了长度,以及指向byte array的指针。这样我们可以不需要进行额外的拷贝(指针直接指向db内部数据吗?感觉不太可能,之后可以看一下具体的实现)
leveldb::Slice s1 = "hello";
std::string str("world");
leveldb::Slice s2 = str;
要注意slice只是引用,所以我们需要保证他引用的数据的生命周期比他长
leveldb的默认比较函数是根据字典序比较
我们可以自定义比较函数
class TwoPartComparator : public leveldb::Comparator {
public:
// Three-way comparison function:
// if a < b: negative result
// if a > b: positive result
// else: zero result
int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
int a1, a2, b1, b2;
ParseKey(a, &a1, &a2);
ParseKey(b, &b1, &b2);
if (a1 < b1) return -1;
if (a1 > b1) return +1;
if (a2 < b2) return -1;
if (a2 > b2) return +1;
return 0;
}
// Ignore the following methods for now:
const char* Name() const { return "TwoPartComparator"; }
void FindShortestSeparator(std::string*, const leveldb::Slice&) const {}
void FindShortSuccessor(std::string*) const {}
};
在创建数据库的时候,就可以使用这个comparator
TwoPartComparator cmp;
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
options.comparator = &cmp;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
兼容性。leveldb每次打开数据库的时候会检查comparator的名字是否一致。如果不一致他就会打开失败。
然而我们可以通过一些手段去调整comparator。比如我们可以在key的最后面存一个version number。然后在切换到新的comparator的时候,我们修改version number并添加新的属性。在进行比较判断的时候,根据key的version来判断使用那个逻辑来比较。
我们可以通过调整option.h
中参数来调整性能
block size:
默认的block size是接近4096个未压缩的字节。block是从persistent storage传输的单位。如果我们有大量的bulk read,那增大block size会更好。如果有很多的point read,减少block size会更好。
compression:
每一个block在写入到persistent storage之前都会独立的进行compression。某些情况下用户可能希望关掉compression。
cache:
#include "leveldb/cache.h"
leveldb::Options options;
options.block_cache = leveldb::NewLRUCache(100 * 1048576); // 100MB cache
leveldb::DB* db;
leveldb::DB::Open(options, name, &db);
... use the db ...
delete db
delete options.block_cache;
cache中会保存常用的未压缩的数据。所以他应该根据application level的数据来规划大小。对于compressed data,他们的缓存会停留在os的buffer cache中。所以这里指定的cache就是解压后的kv。后续的读写就可以避免磁盘IO以及解压缩操作了。
leveldb::ReadOptions options;
options.fill_cache = false;
leveldb::Iterator* it = db->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
...
}
delete it;
在进行bulk read的时候,application可能希望关掉cache,这样他就不会影响现有的cache
key layout:
leveldb没有表的概念。所以他们的数据都是存在一起的。文档中给的例子就是假设我们希望基于leveldb实现一个文件系统,那么要存储的内容可能如下
filename -> permission-bits, length, list of file_block_ids
file_block_id -> data
如果我们让filename和file block id的前缀不同。比如filename有前缀/
,而file block id有前缀0
,由于leveldb按序存储数据,所以我们在扫描文件元数据的时候就不会扫描到data,从而加速了扫描。
filters:
leveldb通过filter来加速读取。
leveldb::Options options;
options.filter_policy = NewBloomFilterPolicy(10);
leveldb::DB* db;
leveldb::DB::Open(options, "/tmp/testdb", &db);
... use the database ...
delete db;
delete options.filter_policy;
上面的例子就是通过bloom filter来进行筛选,从而避免不必要的读取
假设我们的comparator在比较的时候会忽略尾部的空格。如果这时候用bloom filter,基本是没有效果的。因为尾部的空格很容易碰撞。这时候我们就要定义自己的filter
class CustomFilterPolicy : public leveldb::FilterPolicy {
private:
leveldb::FilterPolicy* builtin_policy_;
public:
CustomFilterPolicy() : builtin_policy_(leveldb::NewBloomFilterPolicy(10)) {}
~CustomFilterPolicy() { delete builtin_policy_; }
const char* Name() const { return "IgnoreTrailingSpacesFilter"; }
void CreateFilter(const leveldb::Slice* keys, int n, std::string* dst) const {
// Use builtin bloom filter code after removing trailing spaces
std::vector<leveldb::Slice> trimmed(n);
for (int i = 0; i < n; i++) {
trimmed[i] = RemoveTrailingSpaces(keys[i]);
}
builtin_policy_->CreateFilter(trimmed.data(), n, dst);
}
};
checksum:
checksum可以用来检查数据是否损坏。默认是关闭的
approximate size:
leveldb可以计算给定的key大概占用了多少字节
environment:
所有的file operation都是通过leveldb::Env
来进行的。用户可以提供自己的Env,从而获得更多的控制能力。比如我们可能引入额外的延迟,从而减少leveldb对系统资源的占用。
差不多就是这些,可以看到leveldb提供的功能还是非常全面的。好的系统不仅要有好的性能,内部实现,设计。同时也要有好的API来给外部用户使用。leveldb提供的这些API就非常简洁,同时给了用户很大的空间去调整。值得仔细思考学习。
文章评论