本文主要介绍 xdl 中的 embedding 和 statis 的底层数据结构。

embedding 和 statis 其实都是保存在ps 端上的 variable,包含 data 和 slot 部分。

只不过对于statis来说主体是__decay_rate 这个slot,data部分是shape 为[featuredim,1],初始化方式为zeros的tensor,并没有被用到。

在 ps server 端,variable 的结构如下:

class Variable {
 
 
 private:
  std::unique_ptr<Tensor> data_;//emd_dim的参数
  std::unique_ptr<Data> slicer_;//hashmap,从hashkey到id的映射
  std::unordered_map<std::string, Slot> slots_;//slot_name到对应slot
 public:
  struct Slot {
    std::unique_ptr<Tensor> tensor;
    SlotJoiner joiner;
  };
};

hashmap 管理特征准入和退出策略,我们可以从函数签名中看到 ,建立hashkey到id的映射关系,ids存的是我们输入的key在data这个tensor中的位置

add_probability 会在这里起作用,进行特征准入,reused_ids会把一些id剔除

特征准入

int64_t Get(const int64_t* keys,
                 size_t size, bool not_insert,
                 float add_probability,
                 std::vector<size_t>* ids,
                 tbb::concurrent_vector<size_t>* reused_ids,
                 size_t* filtered_keys,
                 size_t block_size = 500, size_t timeout = 1800)

其中 add_probability 就是以一定概率对 feature key 进行准入。

特征退出

if (max_id > 0) {
  PS_CHECK_STATUS(variable->ReShapeId(max_id));
}
if (reused_ids.size() != 0) {
  std::vector<size_t> raw_reused_ids;
  for (auto iter : reused_ids) {
    raw_reused_ids.push_back(iter);
  }
  variable->ClearIds(raw_reused_ids);
}

同一个 variable,在ps会通过均衡策略保存不同的部分到不同的ps上。
向ps请求一个variable的指定key的时候,就需要计算分别向哪些ps请求哪些key,这部分工作是在client端的partition代码完成的

以初始化PsConstantInitializerOp为例,在client端调用HashInitializer,在client中调用partitioner::HashShape来计算在各个ps上的形状


void Client::HashInitializer(const std::string& variable_name,
                             Initializer* init,
                             const Client::Callback& cb) {
  VariableInfo info;
  CHECK_ASYNC(GetVariableInfo(variable_name, &info));
  std::string extra_info;
  for (auto& arg : info.args) {
    extra_info += arg.first + "=" + arg.second + "&";
  }
  if (!extra_info.empty()) { extra_info.pop_back(); }
  std::vector<Data*> inputs = Args(0, 0, extra_info, std::unique_ptr<Initializer>(init));
  std::vector<std::unique_ptr<Data>>* outputs =
    new std::vector<std::unique_ptr<Data>>;
  std::vector<Partitioner*> splitter = {
    new partitioner::HashDataType,
    new partitioner::HashShape,
    new partitioner::Broadcast,
    new partitioner::Broadcast
  };
  std::vector<Partitioner*> combiner = {};
  UdfData udf("HashVariableInitializer", UdfData(0), UdfData(1), UdfData(2), UdfData(3));
  ...
}
 
 
Status HashShape::Split(PartitionerContext* ctx, Data* src, std::vector<Data*>* dst) {
  VariableInfo* info = ctx->GetVariableInfo();
  std::vector<size_t> dims(info->shape.begin(), info->shape.end());
  dst->clear();
  for (size_t i = 0; i < info->parts.size(); i++) {
    size_t k = info->parts[i].size * info->shape[0] / Hasher::kTargetRange;
    dims[0] = k + 10 * sqrt(k) + 10;
    Data* d = new WrapperData<TensorShape>(dims);
    ctx->AddDeleter(d);
    dst->push_back(d);
  }
  return Status::Ok();
}

client 通过 udf 通知ps做什么样的操作,比如这个初始化就是告诉 ps server 调用 HashVariableInitializer 的simplerun方法进行初始化

class HashVariableInitializer : public SimpleUdf<DataType, TensorShape, std::string, std::unique_ptr<Initializer>> {
 public:
  virtual Status SimpleRun(
      UdfContext* ctx,
      const DataType& dt,
      const TensorShape& shape,
      const std::string& extra_info,
      const std::unique_ptr<Initializer>& initializer) const {
    ...
    std::vector<std::string> slots;
    ...
    ps::Status status = GetStorageManager(ctx)->Get(var_name, &var);
    ...
    if (!status.IsOk()) {
      return ctx->GetStorageManager()->Set(var_name, [&]{
            HashMap* hashmap = new HashMapImpl<int64_t>(init_shape[0]);
            Variable* var = new Variable(new Tensor(dt, init_shape, initializer->Clone(), Tensor::TType::kSegment, true), new WrapperData<std::unique_ptr<HashMap> >(hashmap), var_name);
            Status st = InitSlots(slots, var);
            ...
            return var;
          });
    } else {
      ...
  }
 
 
  Status InitSlots(const std::vector<std::string>& slots, Variable* var) const {
    for (auto& slot : slots) {
      ...
      Tensor* t = var->GetVariableLikeSlot(tuple[0], dtype, TensorShape(inner_dims), []{ return new initializer::ConstantInitializer(0); });
      ...
    return Status::Ok();
  }