Compare commits

..

No commits in common. "36588c650e94f901d040a0e3a7b7b1b2cdab0492" and "d1cab28a1044a147a107ae067890850041956777" have entirely different histories.

4 changed files with 176 additions and 144 deletions

View File

@ -44,5 +44,6 @@ pub struct RenamedFeatures {
}
pub fn parse(json_str: &str) -> Result<AllConfig, Error> {
serde_json::from_str(json_str)
let all_config: AllConfig = serde_json::from_str(json_str)?;
return std::result::Result::Ok(all_config);
}

View File

@ -16,7 +16,8 @@ use segdense::util;
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
use thrift::transport::TBufferChannel;
use crate::{all_config, all_config::AllConfig};
use crate::{all_config};
use crate::all_config::AllConfig;
pub fn log_feature_match(
dr: &DataRecord,
@ -26,22 +27,26 @@ pub fn log_feature_match(
// Note the following algorithm matches features from config using linear search.
// Also the record source is MinDataRecord. This includes only binary and continous features for now.
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() {
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap().into_iter() {
debug!(
"{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}"
"{} - Continous Datarecord => Feature ID: {}, Feature value: {}",
dr_type, feature_id, feature_value
);
for input_feature in &seg_dense_config.cont.input_features {
if input_feature.feature_id == *feature_id {
debug!("Matching input feature: {input_feature:?}")
debug!("Matching input feature: {:?}", input_feature)
}
}
}
for feature_id in dr.binary_features.as_ref().unwrap() {
debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}");
for feature_id in dr.binary_features.as_ref().unwrap().into_iter() {
debug!(
"{} - Binary Datarecord => Feature ID: {}",
dr_type, feature_id
);
for input_feature in &seg_dense_config.binary.input_features {
if input_feature.feature_id == *feature_id {
debug!("Found input feature: {input_feature:?}")
debug!("Found input feature: {:?}", input_feature)
}
}
}
@ -91,13 +96,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
reporting_feature_ids: Vec<(i64, &str)>,
register_metric_fn: Option<impl Fn(&HistogramVec)>,
) -> BatchPredictionRequestToTorchTensorConverter {
let all_config_path = format!("{model_dir}/{model_version}/all_config.json");
let seg_dense_config_path =
format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json");
let all_config_path = format!("{}/{}/all_config.json", model_dir, model_version);
let seg_dense_config_path = format!(
"{}/{}/segdense_transform_spec_home_recap_2022.json",
model_dir, model_version
);
let seg_dense_config = util::load_config(&seg_dense_config_path);
let all_config = all_config::parse(
&fs::read_to_string(&all_config_path)
.unwrap_or_else(|error| panic!("error loading all_config.json - {error}")),
.unwrap_or_else(|error| panic!("error loading all_config.json - {}", error)),
)
.unwrap();
@ -131,11 +138,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
let discrete = HistogramVec::new(
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
.buckets(Vec::from([
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
.buckets(Vec::from(&[
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
300.0, 500.0, 1000.0, 10000.0, 100000.0,
])),
] as &'static [f64])),
&["feature_id"],
)
.expect("metric cannot be created");
@ -144,18 +151,18 @@ impl BatchPredictionRequestToTorchTensorConverter {
":navi:feature_id:continuous",
"continuous Feature ID values",
)
.buckets(Vec::from([
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0,
500.0, 1000.0, 10000.0, 100000.0,
])),
.buckets(Vec::from(&[
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0,
130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0,
1000.0, 10000.0, 100000.0,
] as &'static [f64])),
&["feature_id"],
)
.expect("metric cannot be created");
if let Some(r) = register_metric_fn {
register_metric_fn.map(|r| {
r(&discrete);
r(&continuous);
}
});
(discrete, continuous)
});
@ -164,13 +171,16 @@ impl BatchPredictionRequestToTorchTensorConverter {
for (feature_id, feature_type) in reporting_feature_ids.iter() {
match *feature_type {
"discrete" => discrete_features_to_report.insert(*feature_id),
"continuous" => continuous_features_to_report.insert(*feature_id),
_ => panic!("Invalid feature type {feature_type} for reporting metrics!"),
"discrete" => discrete_features_to_report.insert(feature_id.clone()),
"continuous" => continuous_features_to_report.insert(feature_id.clone()),
_ => panic!(
"Invalid feature type {} for reporting metrics!",
feature_type
),
};
}
BatchPredictionRequestToTorchTensorConverter {
return BatchPredictionRequestToTorchTensorConverter {
all_config,
seg_dense_config,
all_config_path,
@ -183,7 +193,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
continuous_features_to_report,
discrete_feature_metrics,
continuous_feature_metrics,
}
};
}
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
@ -193,7 +203,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
return feature.feature_id;
}
}
-1
return -1;
}
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
@ -201,7 +211,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
bc.set_readable_bytes(&bytes);
let mut protocol = TBinaryInputProtocol::new(bc, true);
BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap()
return BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap();
}
fn get_embedding_tensors(
@ -218,9 +228,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut working_set = vec![0 as f32; total_size];
let mut bpr_start = 0;
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
if bpr.common_features.is_some()
&& bpr.common_features.as_ref().unwrap().tensors.is_some()
&& bpr
if bpr.common_features.is_some() {
if bpr.common_features.as_ref().unwrap().tensors.is_some() {
if bpr
.common_features
.as_ref()
.unwrap()
@ -258,6 +268,8 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
}
}
}
}
// find the feature in individual feature list and add to corresponding batch.
for (index, datarecord) in bpr.individual_features_list.iter().enumerate() {
if datarecord.tensors.is_some()
@ -288,7 +300,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
bpr_start = bpr_end;
}
Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap()
return Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap();
}
// Todo : Refactor, create a generic version with different type and field accessors
@ -298,9 +310,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
// (INT64 --> INT64, DataRecord.discrete_feature)
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows = batch_ends[batch_ends.len() - 1];
let cols = 5293;
let full_size = rows * cols;
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 5293;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val = f32::NAN;
let mut tensor = vec![default_val; full_size];
@ -325,48 +337,55 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
if let Some(f_info) = self.feature_mapper.get(feature.0) {
match self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
tensor[flat_index] = feature.1.into_inner() as f32;
}
}
}
None => (),
}
if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner())
.observe(feature.1.into_inner() as f64)
} else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner())
.observe(feature.1.into_inner() as f64)
}
}
}
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
let dr: &DataRecord =
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
if dr.continuous_features.is_some() {
for feature in dr.continuous_features.as_ref().unwrap() {
if let Some(f_info) = self.feature_mapper.get(feature.0) {
match self.feature_mapper.get(&feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
if flat_index < tensor.len() && idx < cols {
tensor[flat_index] = feature.1.into_inner() as f32;
}
}
None => (),
}
if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner())
.observe(feature.1.into_inner() as f64)
} else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner())
.observe(feature.1.into_inner() as f64)
}
}
}
@ -374,19 +393,22 @@ impl BatchPredictionRequestToTorchTensorConverter {
bpr_start = bpr_end;
}
InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec([rows, cols], tensor)
return InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec(
[rows.try_into().unwrap(), cols.try_into().unwrap()],
tensor,
)
.unwrap()
.into_dyn(),
)
);
}
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows = batch_ends[batch_ends.len() - 1];
let cols = 149;
let full_size = rows * cols;
let default_val = 0;
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 149;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val: i64 = 0;
let mut v = vec![default_val; full_size];
@ -410,48 +432,55 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
if let Some(f_info) = self.feature_mapper.get(feature) {
match self.feature_mapper.get(feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = 1;
}
}
}
None => (),
}
}
}
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
let dr: &DataRecord =
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
if dr.binary_features.is_some() {
for feature in dr.binary_features.as_ref().unwrap() {
if let Some(f_info) = self.feature_mapper.get(feature) {
match self.feature_mapper.get(&feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = 1;
}
None => (),
}
}
}
}
bpr_start = bpr_end;
}
InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows, cols], v)
return InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
.unwrap()
.into_dyn(),
)
);
}
#[allow(dead_code)]
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema
let rows = batch_ends[batch_ends.len() - 1];
let cols = 320;
let full_size = rows * cols;
let default_val = 0;
let rows: usize = batch_ends[batch_ends.len() - 1];
let cols: usize = 320;
let full_size: usize = (rows * cols).try_into().unwrap();
let default_val: i64 = 0;
let mut v = vec![default_val; full_size];
@ -475,16 +504,19 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap();
for feature in common_features {
if let Some(f_info) = self.feature_mapper.get(feature.0) {
match self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
if idx < cols {
// Set value in each row
for r in bpr_start..bpr_end {
let flat_index = r * cols + idx;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
v[flat_index] = *feature.1;
}
}
}
None => (),
}
if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
@ -495,16 +527,19 @@ impl BatchPredictionRequestToTorchTensorConverter {
// Process the batch of datarecords
for r in bpr_start..bpr_end {
let dr: &DataRecord = &bpr.individual_features_list[r];
let dr: &DataRecord = &bpr.individual_features_list[usize::try_from(r).unwrap()];
if dr.discrete_features.is_some() {
for feature in dr.discrete_features.as_ref().unwrap() {
if let Some(f_info) = self.feature_mapper.get(feature.0) {
match self.feature_mapper.get(&feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize;
let flat_index = r * cols + idx;
let flat_index: usize = (r * cols + idx).try_into().unwrap();
if flat_index < v.len() && idx < cols {
v[flat_index] = *feature.1;
}
}
None => (),
}
if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()])
@ -515,11 +550,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
}
bpr_start = bpr_end;
}
InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows, cols], v)
return InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
.unwrap()
.into_dyn(),
)
);
}
fn get_user_embedding(
@ -569,7 +604,7 @@ impl Converter for BatchPredictionRequestToTorchTensorConverter {
.map(|bpr| bpr.individual_features_list.len())
.scan(0usize, |acc, e| {
//running total
*acc += e;
*acc = *acc + e;
Some(*acc)
})
.collect::<Vec<_>>();

View File

@ -9,17 +9,15 @@ use std::{
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
let file = File::open(file_name).expect("could not read file");
let mut result = vec![];
for (mut line_count, line) in io::BufReader::new(file).lines().enumerate() {
line_count += 1;
for line in io::BufReader::new(file).lines() {
match base64::decode(line.unwrap().trim()) {
Ok(payload) => result.push(payload),
Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"),
Err(err) => println!("error decoding line {}", err),
}
}
println!("result len: {}", result.len());
result
println!("reslt len: {}", result.len());
return result;
}
pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) {
let mut writer = WriteOptions::new()
.default_dtype()

View File

@ -1,6 +1,6 @@
# Navi: High-Performance Machine Learning Serving Server in Rust
Navi is a high-performance, versatile machine learning serving server implemented in Rust and tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
Navi is a high-performance, versatile machine learning serving server implemented in Rust, tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
## Key Features
@ -23,14 +23,12 @@ While Navi's features may not be as comprehensive as its open-source counterpart
- `thrift_bpr_adapter`: generated thrift code for BatchPredictionRequest
## Content
We have included all *.rs source code files that make up the main Navi binaries for you to examine. However, we have not included the test and benchmark code, or various configuration files, due to data security concerns.
We include all *.rs source code that makes up the main navi binaries for you to examine. The test and benchmark code, as well as configuration files are not included due to data security concerns.
## Run
In navi/navi, you can run the following commands:
- `scripts/run_tf2.sh` for [TensorFlow](https://www.tensorflow.org/)
- `scripts/run_onnx.sh` for [Onnx](https://onnx.ai/)
Do note that you need to create a models directory and create some versions, preferably using epoch time, e.g., `1679693908377`.
in navi/navi you can run. Note you need to create a models directory and create some versions, preferably using epoch time, e.g., 1679693908377
- scripts/run_tf2.sh
- scripts/run_onnx.sh
## Build
You can adapt the above scripts to build using Cargo.
you can adapt the above scripts to build using Cargo