本文主要介绍 xdl 中的 embedding 和 statis 的底层数据结构。
embedding 和 statis 其实都是保存在ps 端上的 variable,包含 data 和 slot 部分。
只不过对于statis来说主体是__decay_rate 这个slot,data部分是shape 为[featuredim,1],初始化方式为zeros的tensor,并没有被用到。
在 ps server 端,variable 的结构如下:
class Variable {
private:
std::unique_ptr<Tensor> data_;//emd_dim的参数
std::unique_ptr<Data> slicer_;//hashmap,从hashkey到id的映射
std::unordered_map<std::string, Slot> slots_;//slot_name到对应slot
public:
struct Slot {
std::unique_ptr<Tensor> tensor;
SlotJoiner joiner;
};
};
hashmap 管理特征准入和退出策略,我们可以从函数签名中看到 ,建立hashkey到id的映射关系,ids存的是我们输入的key在data这个tensor中的位置
add_probability 会在这里起作用,进行特征准入,reused_ids会把一些id剔除
特征准入
int64_t Get(const int64_t* keys,
size_t size, bool not_insert,
float add_probability,
std::vector<size_t>* ids,
tbb::concurrent_vector<size_t>* reused_ids,
size_t* filtered_keys,
size_t block_size = 500, size_t timeout = 1800)
其中 add_probability 就是以一定概率对 feature key 进行准入。
特征退出
if (max_id > 0) {
PS_CHECK_STATUS(variable->ReShapeId(max_id));
}
if (reused_ids.size() != 0) {
std::vector<size_t> raw_reused_ids;
for (auto iter : reused_ids) {
raw_reused_ids.push_back(iter);
}
variable->ClearIds(raw_reused_ids);
}
同一个 variable,在ps会通过均衡策略保存不同的部分到不同的ps上。
向ps请求一个variable的指定key的时候,就需要计算分别向哪些ps请求哪些key,这部分工作是在client端的partition代码完成的
以初始化PsConstantInitializerOp为例,在client端调用HashInitializer,在client中调用partitioner::HashShape来计算在各个ps上的形状
void Client::HashInitializer(const std::string& variable_name,
Initializer* init,
const Client::Callback& cb) {
VariableInfo info;
CHECK_ASYNC(GetVariableInfo(variable_name, &info));
std::string extra_info;
for (auto& arg : info.args) {
extra_info += arg.first + "=" + arg.second + "&";
}
if (!extra_info.empty()) { extra_info.pop_back(); }
std::vector<Data*> inputs = Args(0, 0, extra_info, std::unique_ptr<Initializer>(init));
std::vector<std::unique_ptr<Data>>* outputs =
new std::vector<std::unique_ptr<Data>>;
std::vector<Partitioner*> splitter = {
new partitioner::HashDataType,
new partitioner::HashShape,
new partitioner::Broadcast,
new partitioner::Broadcast
};
std::vector<Partitioner*> combiner = {};
UdfData udf("HashVariableInitializer", UdfData(0), UdfData(1), UdfData(2), UdfData(3));
...
}
Status HashShape::Split(PartitionerContext* ctx, Data* src, std::vector<Data*>* dst) {
VariableInfo* info = ctx->GetVariableInfo();
std::vector<size_t> dims(info->shape.begin(), info->shape.end());
dst->clear();
for (size_t i = 0; i < info->parts.size(); i++) {
size_t k = info->parts[i].size * info->shape[0] / Hasher::kTargetRange;
dims[0] = k + 10 * sqrt(k) + 10;
Data* d = new WrapperData<TensorShape>(dims);
ctx->AddDeleter(d);
dst->push_back(d);
}
return Status::Ok();
}
client 通过 udf 通知ps做什么样的操作,比如这个初始化就是告诉 ps server 调用 HashVariableInitializer 的simplerun方法进行初始化
class HashVariableInitializer : public SimpleUdf<DataType, TensorShape, std::string, std::unique_ptr<Initializer>> {
public:
virtual Status SimpleRun(
UdfContext* ctx,
const DataType& dt,
const TensorShape& shape,
const std::string& extra_info,
const std::unique_ptr<Initializer>& initializer) const {
...
std::vector<std::string> slots;
...
ps::Status status = GetStorageManager(ctx)->Get(var_name, &var);
...
if (!status.IsOk()) {
return ctx->GetStorageManager()->Set(var_name, [&]{
HashMap* hashmap = new HashMapImpl<int64_t>(init_shape[0]);
Variable* var = new Variable(new Tensor(dt, init_shape, initializer->Clone(), Tensor::TType::kSegment, true), new WrapperData<std::unique_ptr<HashMap> >(hashmap), var_name);
Status st = InitSlots(slots, var);
...
return var;
});
} else {
...
}
Status InitSlots(const std::vector<std::string>& slots, Variable* var) const {
for (auto& slot : slots) {
...
Tensor* t = var->GetVariableLikeSlot(tuple[0], dtype, TensorShape(inner_dims), []{ return new initializer::ConstantInitializer(0); });
...
return Status::Ok();
}