Compare commits

...

6 Commits

Author SHA1 Message Date
dzhao 36588c650e
Merge pull request #452 from pouriya/refactor-dr_transform
Refactor `navi/dr_transform`
2023-04-04 11:06:35 -07:00
dzhao e8147d8e5f
Update README.md 2023-04-04 09:32:40 -07:00
dzhao 9f0afc0ec4
Merge pull request #550 from MrAuro/improve-navi-docs
(docs): Improve README file for Navi
2023-04-04 09:30:11 -07:00
Auro 9115361f00
(docs): Improve README file for Navi 2023-03-31 17:52:31 -07:00
Pouriya Jahanbakhsh ee5e7fc18d feat(navi/dr_transform): add filename:line to file reader error message 2023-04-01 02:33:51 +03:30
Pouriya Jahanbakhsh 2dbdfe173c ref(navi/dr_transform): fix clippy & formatting issues 2023-04-01 02:21:44 +03:30
4 changed files with 144 additions and 176 deletions

View File

@ -1,6 +1,6 @@
# Navi: High-Performance Machine Learning Serving Server in Rust
Navi is a high-performance, versatile machine learning serving server implemented in Rust, tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
Navi is a high-performance, versatile machine learning serving server implemented in Rust and tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
## Key Features
@ -23,12 +23,14 @@ While Navi's features may not be as comprehensive as its open-source counterpart
- `thrift_bpr_adapter`: generated thrift code for BatchPredictionRequest
## Content
We include all *.rs source code that makes up the main navi binaries for you to examine. The test and benchmark code, as well as configuration files are not included due to data security concerns.
We have included all *.rs source code files that make up the main Navi binaries for you to examine. However, we have not included the test and benchmark code, or various configuration files, due to data security concerns.
## Run
in navi/navi you can run. Note you need to create a models directory and create some versions, preferably using epoch time, e.g., 1679693908377
- scripts/run_tf2.sh
- scripts/run_onnx.sh
In navi/navi, you can run the following commands:
- `scripts/run_tf2.sh` for [TensorFlow](https://www.tensorflow.org/)
- `scripts/run_onnx.sh` for [Onnx](https://onnx.ai/)
Do note that you need to create a models directory and create some versions, preferably using epoch time, e.g., `1679693908377`.
## Build
you can adapt the above scripts to build using Cargo
You can adapt the above scripts to build using Cargo.

View File

@ -44,6 +44,5 @@ pub struct RenamedFeatures {
}
pub fn parse(json_str: &str) -> Result<AllConfig, Error> {
let all_config: AllConfig = serde_json::from_str(json_str)?;
return std::result::Result::Ok(all_config);
serde_json::from_str(json_str)
}

View File

@ -16,8 +16,7 @@ use segdense::util;
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
use thrift::transport::TBufferChannel;
use crate::{all_config};
use crate::all_config::AllConfig;
use crate::{all_config, all_config::AllConfig};
pub fn log_feature_match(
dr: &DataRecord,
@ -27,26 +26,22 @@ pub fn log_feature_match(
// Note the following algorithm matches features from config using linear search.
// Also the record source is MinDataRecord. This includes only binary and continous features for now.
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap().into_iter() {
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() {
debug!(
"{} - Continous Datarecord => Feature ID: {}, Feature value: {}",
dr_type, feature_id, feature_value
"{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}"
);
for input_feature in &seg_dense_config.cont.input_features {
if input_feature.feature_id == *feature_id {
debug!("Matching input feature: {:?}", input_feature)
debug!("Matching input feature: {input_feature:?}")
}
}
}
for feature_id in dr.binary_features.as_ref().unwrap().into_iter() {
debug!(
"{} - Binary Datarecord => Feature ID: {}",
dr_type, feature_id
);
for feature_id in dr.binary_features.as_ref().unwrap() {
debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}");
for input_feature in &seg_dense_config.binary.input_features {
if input_feature.feature_id == *feature_id {
debug!("Found input feature: {:?}", input_feature)
debug!("Found input feature: {input_feature:?}")
}
}
}
@ -96,15 +91,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
reporting_feature_ids: Vec<(i64, &str)>,
register_metric_fn: Option<impl Fn(&HistogramVec)>,
) -> BatchPredictionRequestToTorchTensorConverter {
let all_config_path = format!("{}/{}/all_config.json", model_dir, model_version);
let seg_dense_config_path = format!(
"{}/{}/segdense_transform_spec_home_recap_2022.json",
model_dir, model_version
);
let all_config_path = format!("{model_dir}/{model_version}/all_config.json");
let seg_dense_config_path =
format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json");
let seg_dense_config = util::load_config(&seg_dense_config_path);
let all_config = all_config::parse(
&fs::read_to_string(&all_config_path)
.unwrap_or_else(|error| panic!("error loading all_config.json - {}", error)),
.unwrap_or_else(|error| panic!("error loading all_config.json - {error}")),
)
.unwrap();
@ -138,11 +131,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
let discrete = HistogramVec::new(
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
.buckets(Vec::from(&[
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
.buckets(Vec::from([
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
300.0, 500.0, 1000.0, 10000.0, 100000.0,
] as &'static [f64])),
])),
&["feature_id"],
)
.expect("metric cannot be created");
@ -151,18 +144,18 @@ impl BatchPredictionRequestToTorchTensorConverter {
":navi:feature_id:continuous",
"continuous Feature ID values",
)
.buckets(Vec::from(&[
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0,
130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0,
1000.0, 10000.0, 100000.0,
] as &'static [f64])),
.buckets(Vec::from([
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0,
500.0, 1000.0, 10000.0, 100000.0,
])),
&["feature_id"],
)
.expect("metric cannot be created");
register_metric_fn.map(|r| {
if let Some(r) = register_metric_fn {
r(&discrete);
r(&continuous);
});
}
(discrete, continuous)
});
@ -171,16 +164,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
for (feature_id, feature_type) in reporting_feature_ids.iter() {
match *feature_type {
"discrete" => discrete_features_to_report.insert(feature_id.clone()),
"continuous" => continuous_features_to_report.insert(feature_id.clone()),
_ => panic!(
"Invalid feature type {} for reporting metrics!",
feature_type
),
"discrete" => discrete_features_to_report.insert(*feature_id),
"continuous" => continuous_features_to_report.insert(*feature_id),
_ => panic!("Invalid feature type {feature_type} for reporting metrics!"),
};
}
return BatchPredictionRequestToTorchTensorConverter {
BatchPredictionRequestToTorchTensorConverter {
all_config,
seg_dense_config,
all_config_path,
@ -193,7 +183,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
continuous_features_to_report,
discrete_feature_metrics,
continuous_feature_metrics,
};
}
}
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
@ -203,7 +193,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
return feature.feature_id;
}
}
return -1;
-1
}
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
@ -211,7 +201,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
bc.set_readable_bytes(&bytes);
let mut protocol = TBinaryInputProtocol::new(bc, true);
return BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap();
BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap()
}
fn get_embedding_tensors(
@ -228,45 +218,43 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut working_set = vec![0 as f32; total_size];
let mut bpr_start = 0;
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
if bpr.common_features.is_some() {
if bpr.common_features.as_ref().unwrap().tensors.is_some() {
if bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.contains_key(&feature_id)
if bpr.common_features.is_some()
&& bpr.common_features.as_ref().unwrap().tensors.is_some()
&& bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.contains_key(&feature_id)
{
let source_tensor = bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.get(&feature_id)
.unwrap();
let tensor = match source_tensor {
GeneralTensor::FloatTensor(float_tensor) =>
//Tensor::of_slice(
{
let source_tensor = bpr
.common_features
.as_ref()
.unwrap()
.tensors
.as_ref()
.unwrap()
.get(&feature_id)
.unwrap();
let tensor = match source_tensor {
GeneralTensor::FloatTensor(float_tensor) =>
//Tensor::of_slice(
{
float_tensor
.floats
.iter()
.map(|x| x.into_inner() as f32)
.collect::<Vec<_>>()
}
_ => vec![0 as f32; cols],
};
float_tensor
.floats
.iter()
.map(|x| x.into_inner() as f32)
.collect::<Vec<_>>()
}
_ => vec![0 as f32; cols],
};
// since the tensor is found in common feature, add it in all batches
for row in bpr_start..bpr_end {
for col in 0..cols {
working_set[row * cols + col] = tensor[col];
}
}
// since the tensor is found in common feature, add it in all batches
for row in bpr_start..bpr_end {
for col in 0..cols {
working_set[row * cols + col] = tensor[col];
}
}
}
@ -300,7 +288,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
bpr_start = bpr_end;
}
return Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap();
Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap()
}
// Todo : Refactor, create a generic version with different type and field accessors
@ -310,9 +298,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
// (INT64 --> INT64, DataRecord.discrete_feature)
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 5293;
let full_size: usize = (rows * cols).try_into().unwrap();
let rows = batch_ends[batch_ends.len() - 1];
let cols = 5293;
let full_size = rows * cols;
let default_val = f32::NAN;
let mut tensor = vec![default_val; full_size];
@ -337,55 +325,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
match self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap();
tensor[flat_index] = feature.1.into_inner() as f32;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
tensor[flat_index] = feature.1.into_inner() as f32;
}
}
None => (),
}
if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
} else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
}
}
}
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord =
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
if dr.continuous_features.is_some() {
for feature in dr.continuous_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
if flat_index < tensor.len() && idx < cols {
tensor[flat_index] = feature.1.into_inner() as f32;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
if flat_index < tensor.len() && idx < cols {
tensor[flat_index] = feature.1.into_inner() as f32;
}
None => (),
}
if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
} else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64)
.observe(feature.1.into_inner())
}
}
}
@ -393,22 +374,19 @@ impl BatchPredictionRequestToTorchTensorConverter {
bpr_start = bpr_end;
}
return InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec(
[rows.try_into().unwrap(), cols.try_into().unwrap()],
tensor,
)
.unwrap()
.into_dyn(),
);
InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec([rows, cols], tensor)
.unwrap()
.into_dyn(),
)
}
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 149;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val: i64 = 0;
let rows = batch_ends[batch_ends.len() - 1];
let cols = 149;
let full_size = rows * cols;
let default_val = 0;
let mut v = vec![default_val; full_size];
@ -432,55 +410,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
match self.feature_mapper.get(feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = 1;
}
if let Some(f_info) = self.feature_mapper.get(feature) {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
v[flat_index] = 1;
}
}
None => (),
}
}
}
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord =
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
if dr.binary_features.is_some() {
for feature in dr.binary_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = 1;
}
None => (),
if let Some(f_info) = self.feature_mapper.get(feature) {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
v[flat_index] = 1;
}
}
}
}
bpr_start = bpr_end;
}
return InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows, cols], v)
.unwrap()
.into_dyn(),
);
)
}
#[allow(dead_code)]
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 320;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val: i64 = 0;
let rows = batch_ends[batch_ends.len() - 1];
let cols = 320;
let full_size = rows * cols;
let default_val = 0;
let mut v = vec![default_val; full_size];
@ -504,18 +475,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
match self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = *feature.1;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
v[flat_index] = *feature.1;
}
}
None => (),
}
if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
@ -527,18 +495,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord = &bpr.individual_features_list[usize::try_from(r).unwrap()];
let dr: &DataRecord = &bpr.individual_features_list[r];
if dr.discrete_features.is_some() {
for feature in dr.discrete_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
if flat_index < v.len() && idx < cols {
v[flat_index] = *feature.1;
}
if let Some(f_info) = self.feature_mapper.get(feature.0) {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
if flat_index < v.len() && idx < cols {
v[flat_index] = *feature.1;
}
None => (),
}
if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
@ -550,11 +515,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
bpr_start = bpr_end;
}
return InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows, cols], v)
.unwrap()
.into_dyn(),
);
)
}
fn get_user_embedding(
@ -604,7 +569,7 @@ impl Converter for BatchPredictionRequestToTorchTensorConverter {
.map(|bpr| bpr.individual_features_list.len())
.scan(0usize, |acc, e| {
//running total
*acc = *acc + e;
*acc += e;
Some(*acc)
})
.collect::<Vec<_>>();

View File

@ -9,15 +9,17 @@ use std::{
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
let file = File::open(file_name).expect("could not read file");
let mut result = vec![];
for line in io::BufReader::new(file).lines() {
for (mut line_count, line) in io::BufReader::new(file).lines().enumerate() {
line_count += 1;
match base64::decode(line.unwrap().trim()) {
Ok(payload) => result.push(payload),
Err(err) => println!("error decoding line {}", err),
Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"),
}
}
println!("reslt len: {}", result.len());
return result;
println!("result len: {}", result.len());
result
}
pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) {
let mut writer = WriteOptions::new()
.default_dtype()