Compare commits
6 Commits
d1cab28a10
...
36588c650e
Author | SHA1 | Date |
---|---|---|
dzhao | 36588c650e | |
dzhao | e8147d8e5f | |
dzhao | 9f0afc0ec4 | |
Auro | 9115361f00 | |
Pouriya Jahanbakhsh | ee5e7fc18d | |
Pouriya Jahanbakhsh | 2dbdfe173c |
|
@ -1,6 +1,6 @@
|
||||||
# Navi: High-Performance Machine Learning Serving Server in Rust
|
# Navi: High-Performance Machine Learning Serving Server in Rust
|
||||||
|
|
||||||
Navi is a high-performance, versatile machine learning serving server implemented in Rust, tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
|
Navi is a high-performance, versatile machine learning serving server implemented in Rust and tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
|
||||||
|
|
||||||
## Key Features
|
## Key Features
|
||||||
|
|
||||||
|
@ -23,12 +23,14 @@ While Navi's features may not be as comprehensive as its open-source counterpart
|
||||||
- `thrift_bpr_adapter`: generated thrift code for BatchPredictionRequest
|
- `thrift_bpr_adapter`: generated thrift code for BatchPredictionRequest
|
||||||
|
|
||||||
## Content
|
## Content
|
||||||
We include all *.rs source code that makes up the main navi binaries for you to examine. The test and benchmark code, as well as configuration files are not included due to data security concerns.
|
We have included all *.rs source code files that make up the main Navi binaries for you to examine. However, we have not included the test and benchmark code, or various configuration files, due to data security concerns.
|
||||||
|
|
||||||
## Run
|
## Run
|
||||||
in navi/navi you can run. Note you need to create a models directory and create some versions, preferably using epoch time, e.g., 1679693908377
|
In navi/navi, you can run the following commands:
|
||||||
- scripts/run_tf2.sh
|
- `scripts/run_tf2.sh` for [TensorFlow](https://www.tensorflow.org/)
|
||||||
- scripts/run_onnx.sh
|
- `scripts/run_onnx.sh` for [Onnx](https://onnx.ai/)
|
||||||
|
|
||||||
|
Do note that you need to create a models directory and create some versions, preferably using epoch time, e.g., `1679693908377`.
|
||||||
|
|
||||||
## Build
|
## Build
|
||||||
you can adapt the above scripts to build using Cargo
|
You can adapt the above scripts to build using Cargo.
|
|
@ -44,6 +44,5 @@ pub struct RenamedFeatures {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse(json_str: &str) -> Result<AllConfig, Error> {
|
pub fn parse(json_str: &str) -> Result<AllConfig, Error> {
|
||||||
let all_config: AllConfig = serde_json::from_str(json_str)?;
|
serde_json::from_str(json_str)
|
||||||
return std::result::Result::Ok(all_config);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,7 @@ use segdense::util;
|
||||||
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
|
use thrift::protocol::{TBinaryInputProtocol, TSerializable};
|
||||||
use thrift::transport::TBufferChannel;
|
use thrift::transport::TBufferChannel;
|
||||||
|
|
||||||
use crate::{all_config};
|
use crate::{all_config, all_config::AllConfig};
|
||||||
use crate::all_config::AllConfig;
|
|
||||||
|
|
||||||
pub fn log_feature_match(
|
pub fn log_feature_match(
|
||||||
dr: &DataRecord,
|
dr: &DataRecord,
|
||||||
|
@ -27,26 +26,22 @@ pub fn log_feature_match(
|
||||||
// Note the following algorithm matches features from config using linear search.
|
// Note the following algorithm matches features from config using linear search.
|
||||||
// Also the record source is MinDataRecord. This includes only binary and continous features for now.
|
// Also the record source is MinDataRecord. This includes only binary and continous features for now.
|
||||||
|
|
||||||
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap().into_iter() {
|
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() {
|
||||||
debug!(
|
debug!(
|
||||||
"{} - Continous Datarecord => Feature ID: {}, Feature value: {}",
|
"{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}"
|
||||||
dr_type, feature_id, feature_value
|
|
||||||
);
|
);
|
||||||
for input_feature in &seg_dense_config.cont.input_features {
|
for input_feature in &seg_dense_config.cont.input_features {
|
||||||
if input_feature.feature_id == *feature_id {
|
if input_feature.feature_id == *feature_id {
|
||||||
debug!("Matching input feature: {:?}", input_feature)
|
debug!("Matching input feature: {input_feature:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for feature_id in dr.binary_features.as_ref().unwrap().into_iter() {
|
for feature_id in dr.binary_features.as_ref().unwrap() {
|
||||||
debug!(
|
debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}");
|
||||||
"{} - Binary Datarecord => Feature ID: {}",
|
|
||||||
dr_type, feature_id
|
|
||||||
);
|
|
||||||
for input_feature in &seg_dense_config.binary.input_features {
|
for input_feature in &seg_dense_config.binary.input_features {
|
||||||
if input_feature.feature_id == *feature_id {
|
if input_feature.feature_id == *feature_id {
|
||||||
debug!("Found input feature: {:?}", input_feature)
|
debug!("Found input feature: {input_feature:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -96,15 +91,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
reporting_feature_ids: Vec<(i64, &str)>,
|
reporting_feature_ids: Vec<(i64, &str)>,
|
||||||
register_metric_fn: Option<impl Fn(&HistogramVec)>,
|
register_metric_fn: Option<impl Fn(&HistogramVec)>,
|
||||||
) -> BatchPredictionRequestToTorchTensorConverter {
|
) -> BatchPredictionRequestToTorchTensorConverter {
|
||||||
let all_config_path = format!("{}/{}/all_config.json", model_dir, model_version);
|
let all_config_path = format!("{model_dir}/{model_version}/all_config.json");
|
||||||
let seg_dense_config_path = format!(
|
let seg_dense_config_path =
|
||||||
"{}/{}/segdense_transform_spec_home_recap_2022.json",
|
format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json");
|
||||||
model_dir, model_version
|
|
||||||
);
|
|
||||||
let seg_dense_config = util::load_config(&seg_dense_config_path);
|
let seg_dense_config = util::load_config(&seg_dense_config_path);
|
||||||
let all_config = all_config::parse(
|
let all_config = all_config::parse(
|
||||||
&fs::read_to_string(&all_config_path)
|
&fs::read_to_string(&all_config_path)
|
||||||
.unwrap_or_else(|error| panic!("error loading all_config.json - {}", error)),
|
.unwrap_or_else(|error| panic!("error loading all_config.json - {error}")),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -138,11 +131,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
|
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
|
||||||
let discrete = HistogramVec::new(
|
let discrete = HistogramVec::new(
|
||||||
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
|
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
|
||||||
.buckets(Vec::from(&[
|
.buckets(Vec::from([
|
||||||
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
|
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
|
||||||
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
|
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
|
||||||
300.0, 500.0, 1000.0, 10000.0, 100000.0,
|
300.0, 500.0, 1000.0, 10000.0, 100000.0,
|
||||||
] as &'static [f64])),
|
])),
|
||||||
&["feature_id"],
|
&["feature_id"],
|
||||||
)
|
)
|
||||||
.expect("metric cannot be created");
|
.expect("metric cannot be created");
|
||||||
|
@ -151,18 +144,18 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
":navi:feature_id:continuous",
|
":navi:feature_id:continuous",
|
||||||
"continuous Feature ID values",
|
"continuous Feature ID values",
|
||||||
)
|
)
|
||||||
.buckets(Vec::from(&[
|
.buckets(Vec::from([
|
||||||
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0,
|
0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
|
||||||
130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0,
|
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0,
|
||||||
1000.0, 10000.0, 100000.0,
|
500.0, 1000.0, 10000.0, 100000.0,
|
||||||
] as &'static [f64])),
|
])),
|
||||||
&["feature_id"],
|
&["feature_id"],
|
||||||
)
|
)
|
||||||
.expect("metric cannot be created");
|
.expect("metric cannot be created");
|
||||||
register_metric_fn.map(|r| {
|
if let Some(r) = register_metric_fn {
|
||||||
r(&discrete);
|
r(&discrete);
|
||||||
r(&continuous);
|
r(&continuous);
|
||||||
});
|
}
|
||||||
(discrete, continuous)
|
(discrete, continuous)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -171,16 +164,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
|
|
||||||
for (feature_id, feature_type) in reporting_feature_ids.iter() {
|
for (feature_id, feature_type) in reporting_feature_ids.iter() {
|
||||||
match *feature_type {
|
match *feature_type {
|
||||||
"discrete" => discrete_features_to_report.insert(feature_id.clone()),
|
"discrete" => discrete_features_to_report.insert(*feature_id),
|
||||||
"continuous" => continuous_features_to_report.insert(feature_id.clone()),
|
"continuous" => continuous_features_to_report.insert(*feature_id),
|
||||||
_ => panic!(
|
_ => panic!("Invalid feature type {feature_type} for reporting metrics!"),
|
||||||
"Invalid feature type {} for reporting metrics!",
|
|
||||||
feature_type
|
|
||||||
),
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
return BatchPredictionRequestToTorchTensorConverter {
|
BatchPredictionRequestToTorchTensorConverter {
|
||||||
all_config,
|
all_config,
|
||||||
seg_dense_config,
|
seg_dense_config,
|
||||||
all_config_path,
|
all_config_path,
|
||||||
|
@ -193,7 +183,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
continuous_features_to_report,
|
continuous_features_to_report,
|
||||||
discrete_feature_metrics,
|
discrete_feature_metrics,
|
||||||
continuous_feature_metrics,
|
continuous_feature_metrics,
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
|
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
|
||||||
|
@ -203,7 +193,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
return feature.feature_id;
|
return feature.feature_id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
-1
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
|
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
|
||||||
|
@ -211,7 +201,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
|
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
|
||||||
bc.set_readable_bytes(&bytes);
|
bc.set_readable_bytes(&bytes);
|
||||||
let mut protocol = TBinaryInputProtocol::new(bc, true);
|
let mut protocol = TBinaryInputProtocol::new(bc, true);
|
||||||
return BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap();
|
BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_embedding_tensors(
|
fn get_embedding_tensors(
|
||||||
|
@ -228,45 +218,43 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
let mut working_set = vec![0 as f32; total_size];
|
let mut working_set = vec![0 as f32; total_size];
|
||||||
let mut bpr_start = 0;
|
let mut bpr_start = 0;
|
||||||
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
|
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
|
||||||
if bpr.common_features.is_some() {
|
if bpr.common_features.is_some()
|
||||||
if bpr.common_features.as_ref().unwrap().tensors.is_some() {
|
&& bpr.common_features.as_ref().unwrap().tensors.is_some()
|
||||||
if bpr
|
&& bpr
|
||||||
.common_features
|
.common_features
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.tensors
|
.tensors
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.contains_key(&feature_id)
|
.contains_key(&feature_id)
|
||||||
|
{
|
||||||
|
let source_tensor = bpr
|
||||||
|
.common_features
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.tensors
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.get(&feature_id)
|
||||||
|
.unwrap();
|
||||||
|
let tensor = match source_tensor {
|
||||||
|
GeneralTensor::FloatTensor(float_tensor) =>
|
||||||
|
//Tensor::of_slice(
|
||||||
{
|
{
|
||||||
let source_tensor = bpr
|
float_tensor
|
||||||
.common_features
|
.floats
|
||||||
.as_ref()
|
.iter()
|
||||||
.unwrap()
|
.map(|x| x.into_inner() as f32)
|
||||||
.tensors
|
.collect::<Vec<_>>()
|
||||||
.as_ref()
|
}
|
||||||
.unwrap()
|
_ => vec![0 as f32; cols],
|
||||||
.get(&feature_id)
|
};
|
||||||
.unwrap();
|
|
||||||
let tensor = match source_tensor {
|
|
||||||
GeneralTensor::FloatTensor(float_tensor) =>
|
|
||||||
//Tensor::of_slice(
|
|
||||||
{
|
|
||||||
float_tensor
|
|
||||||
.floats
|
|
||||||
.iter()
|
|
||||||
.map(|x| x.into_inner() as f32)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
_ => vec![0 as f32; cols],
|
|
||||||
};
|
|
||||||
|
|
||||||
// since the tensor is found in common feature, add it in all batches
|
// since the tensor is found in common feature, add it in all batches
|
||||||
for row in bpr_start..bpr_end {
|
for row in bpr_start..bpr_end {
|
||||||
for col in 0..cols {
|
for col in 0..cols {
|
||||||
working_set[row * cols + col] = tensor[col];
|
working_set[row * cols + col] = tensor[col];
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -300,7 +288,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
}
|
}
|
||||||
bpr_start = bpr_end;
|
bpr_start = bpr_end;
|
||||||
}
|
}
|
||||||
return Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap();
|
Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Todo : Refactor, create a generic version with different type and field accessors
|
// Todo : Refactor, create a generic version with different type and field accessors
|
||||||
|
@ -310,9 +298,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
// (INT64 --> INT64, DataRecord.discrete_feature)
|
// (INT64 --> INT64, DataRecord.discrete_feature)
|
||||||
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
||||||
// These need to be part of model schema
|
// These need to be part of model schema
|
||||||
let rows: usize = batch_ends[batch_ends.len() - 1];
|
let rows = batch_ends[batch_ends.len() - 1];
|
||||||
let cols: usize = 5293;
|
let cols = 5293;
|
||||||
let full_size: usize = (rows * cols).try_into().unwrap();
|
let full_size = rows * cols;
|
||||||
let default_val = f32::NAN;
|
let default_val = f32::NAN;
|
||||||
|
|
||||||
let mut tensor = vec![default_val; full_size];
|
let mut tensor = vec![default_val; full_size];
|
||||||
|
@ -337,55 +325,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
for feature in common_features {
|
for feature in common_features {
|
||||||
match self.feature_mapper.get(feature.0) {
|
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||||
Some(f_info) => {
|
let idx = f_info.index_within_tensor as usize;
|
||||||
let idx = f_info.index_within_tensor as usize;
|
if idx < cols {
|
||||||
if idx < cols {
|
// Set value in each row
|
||||||
// Set value in each row
|
for r in bpr_start..bpr_end {
|
||||||
for r in bpr_start..bpr_end {
|
let flat_index = r * cols + idx;
|
||||||
let flat_index: usize = (r * cols + idx).try_into().unwrap();
|
tensor[flat_index] = feature.1.into_inner() as f32;
|
||||||
tensor[flat_index] = feature.1.into_inner() as f32;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
if self.continuous_features_to_report.contains(feature.0) {
|
if self.continuous_features_to_report.contains(feature.0) {
|
||||||
self.continuous_feature_metrics
|
self.continuous_feature_metrics
|
||||||
.with_label_values(&[feature.0.to_string().as_str()])
|
.with_label_values(&[feature.0.to_string().as_str()])
|
||||||
.observe(feature.1.into_inner() as f64)
|
.observe(feature.1.into_inner())
|
||||||
} else if self.discrete_features_to_report.contains(feature.0) {
|
} else if self.discrete_features_to_report.contains(feature.0) {
|
||||||
self.discrete_feature_metrics
|
self.discrete_feature_metrics
|
||||||
.with_label_values(&[feature.0.to_string().as_str()])
|
.with_label_values(&[feature.0.to_string().as_str()])
|
||||||
.observe(feature.1.into_inner() as f64)
|
.observe(feature.1.into_inner())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the batch of datarecords
|
// Process the batch of datarecords
|
||||||
for r in bpr_start..bpr_end {
|
for r in bpr_start..bpr_end {
|
||||||
let dr: &DataRecord =
|
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
|
||||||
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
|
|
||||||
if dr.continuous_features.is_some() {
|
if dr.continuous_features.is_some() {
|
||||||
for feature in dr.continuous_features.as_ref().unwrap() {
|
for feature in dr.continuous_features.as_ref().unwrap() {
|
||||||
match self.feature_mapper.get(&feature.0) {
|
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||||
Some(f_info) => {
|
let idx = f_info.index_within_tensor as usize;
|
||||||
let idx = f_info.index_within_tensor as usize;
|
let flat_index = r * cols + idx;
|
||||||
let flat_index: usize = (r * cols + idx).try_into().unwrap();
|
if flat_index < tensor.len() && idx < cols {
|
||||||
if flat_index < tensor.len() && idx < cols {
|
tensor[flat_index] = feature.1.into_inner() as f32;
|
||||||
tensor[flat_index] = feature.1.into_inner() as f32;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
if self.continuous_features_to_report.contains(feature.0) {
|
if self.continuous_features_to_report.contains(feature.0) {
|
||||||
self.continuous_feature_metrics
|
self.continuous_feature_metrics
|
||||||
.with_label_values(&[feature.0.to_string().as_str()])
|
.with_label_values(&[feature.0.to_string().as_str()])
|
||||||
.observe(feature.1.into_inner() as f64)
|
.observe(feature.1.into_inner())
|
||||||
} else if self.discrete_features_to_report.contains(feature.0) {
|
} else if self.discrete_features_to_report.contains(feature.0) {
|
||||||
self.discrete_feature_metrics
|
self.discrete_feature_metrics
|
||||||
.with_label_values(&[feature.0.to_string().as_str()])
|
.with_label_values(&[feature.0.to_string().as_str()])
|
||||||
.observe(feature.1.into_inner() as f64)
|
.observe(feature.1.into_inner())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -393,22 +374,19 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
bpr_start = bpr_end;
|
bpr_start = bpr_end;
|
||||||
}
|
}
|
||||||
|
|
||||||
return InputTensor::FloatTensor(
|
InputTensor::FloatTensor(
|
||||||
Array2::<f32>::from_shape_vec(
|
Array2::<f32>::from_shape_vec([rows, cols], tensor)
|
||||||
[rows.try_into().unwrap(), cols.try_into().unwrap()],
|
.unwrap()
|
||||||
tensor,
|
.into_dyn(),
|
||||||
)
|
)
|
||||||
.unwrap()
|
|
||||||
.into_dyn(),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
||||||
// These need to be part of model schema
|
// These need to be part of model schema
|
||||||
let rows: usize = batch_ends[batch_ends.len() - 1];
|
let rows = batch_ends[batch_ends.len() - 1];
|
||||||
let cols: usize = 149;
|
let cols = 149;
|
||||||
let full_size: usize = (rows * cols).try_into().unwrap();
|
let full_size = rows * cols;
|
||||||
let default_val: i64 = 0;
|
let default_val = 0;
|
||||||
|
|
||||||
let mut v = vec![default_val; full_size];
|
let mut v = vec![default_val; full_size];
|
||||||
|
|
||||||
|
@ -432,55 +410,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
for feature in common_features {
|
for feature in common_features {
|
||||||
match self.feature_mapper.get(feature) {
|
if let Some(f_info) = self.feature_mapper.get(feature) {
|
||||||
Some(f_info) => {
|
let idx = f_info.index_within_tensor as usize;
|
||||||
let idx = f_info.index_within_tensor as usize;
|
if idx < cols {
|
||||||
if idx < cols {
|
// Set value in each row
|
||||||
// Set value in each row
|
for r in bpr_start..bpr_end {
|
||||||
for r in bpr_start..bpr_end {
|
let flat_index = r * cols + idx;
|
||||||
let flat_index: usize = (r * cols + idx).try_into().unwrap();
|
v[flat_index] = 1;
|
||||||
v[flat_index] = 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the batch of datarecords
|
// Process the batch of datarecords
|
||||||
for r in bpr_start..bpr_end {
|
for r in bpr_start..bpr_end {
|
||||||
let dr: &DataRecord =
|
let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
|
||||||
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
|
|
||||||
if dr.binary_features.is_some() {
|
if dr.binary_features.is_some() {
|
||||||
for feature in dr.binary_features.as_ref().unwrap() {
|
for feature in dr.binary_features.as_ref().unwrap() {
|
||||||
match self.feature_mapper.get(&feature) {
|
if let Some(f_info) = self.feature_mapper.get(feature) {
|
||||||
Some(f_info) => {
|
let idx = f_info.index_within_tensor as usize;
|
||||||
let idx = f_info.index_within_tensor as usize;
|
let flat_index = r * cols + idx;
|
||||||
let flat_index: usize = (r * cols + idx).try_into().unwrap();
|
v[flat_index] = 1;
|
||||||
v[flat_index] = 1;
|
|
||||||
}
|
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bpr_start = bpr_end;
|
bpr_start = bpr_end;
|
||||||
}
|
}
|
||||||
return InputTensor::Int64Tensor(
|
InputTensor::Int64Tensor(
|
||||||
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
|
Array2::<i64>::from_shape_vec([rows, cols], v)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.into_dyn(),
|
.into_dyn(),
|
||||||
);
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
|
||||||
// These need to be part of model schema
|
// These need to be part of model schema
|
||||||
let rows: usize = batch_ends[batch_ends.len() - 1];
|
let rows = batch_ends[batch_ends.len() - 1];
|
||||||
let cols: usize = 320;
|
let cols = 320;
|
||||||
let full_size: usize = (rows * cols).try_into().unwrap();
|
let full_size = rows * cols;
|
||||||
let default_val: i64 = 0;
|
let default_val = 0;
|
||||||
|
|
||||||
let mut v = vec![default_val; full_size];
|
let mut v = vec![default_val; full_size];
|
||||||
|
|
||||||
|
@ -504,18 +475,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
for feature in common_features {
|
for feature in common_features {
|
||||||
match self.feature_mapper.get(feature.0) {
|
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||||
Some(f_info) => {
|
let idx = f_info.index_within_tensor as usize;
|
||||||
let idx = f_info.index_within_tensor as usize;
|
if idx < cols {
|
||||||
if idx < cols {
|
// Set value in each row
|
||||||
// Set value in each row
|
for r in bpr_start..bpr_end {
|
||||||
for r in bpr_start..bpr_end {
|
let flat_index = r * cols + idx;
|
||||||
let flat_index: usize = (r * cols + idx).try_into().unwrap();
|
v[flat_index] = *feature.1;
|
||||||
v[flat_index] = *feature.1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
if self.discrete_features_to_report.contains(feature.0) {
|
if self.discrete_features_to_report.contains(feature.0) {
|
||||||
self.discrete_feature_metrics
|
self.discrete_feature_metrics
|
||||||
|
@ -527,18 +495,15 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
|
|
||||||
// Process the batch of datarecords
|
// Process the batch of datarecords
|
||||||
for r in bpr_start..bpr_end {
|
for r in bpr_start..bpr_end {
|
||||||
let dr: &DataRecord = &bpr.individual_features_list[usize::try_from(r).unwrap()];
|
let dr: &DataRecord = &bpr.individual_features_list[r];
|
||||||
if dr.discrete_features.is_some() {
|
if dr.discrete_features.is_some() {
|
||||||
for feature in dr.discrete_features.as_ref().unwrap() {
|
for feature in dr.discrete_features.as_ref().unwrap() {
|
||||||
match self.feature_mapper.get(&feature.0) {
|
if let Some(f_info) = self.feature_mapper.get(feature.0) {
|
||||||
Some(f_info) => {
|
let idx = f_info.index_within_tensor as usize;
|
||||||
let idx = f_info.index_within_tensor as usize;
|
let flat_index = r * cols + idx;
|
||||||
let flat_index: usize = (r * cols + idx).try_into().unwrap();
|
if flat_index < v.len() && idx < cols {
|
||||||
if flat_index < v.len() && idx < cols {
|
v[flat_index] = *feature.1;
|
||||||
v[flat_index] = *feature.1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
if self.discrete_features_to_report.contains(feature.0) {
|
if self.discrete_features_to_report.contains(feature.0) {
|
||||||
self.discrete_feature_metrics
|
self.discrete_feature_metrics
|
||||||
|
@ -550,11 +515,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
|
||||||
}
|
}
|
||||||
bpr_start = bpr_end;
|
bpr_start = bpr_end;
|
||||||
}
|
}
|
||||||
return InputTensor::Int64Tensor(
|
InputTensor::Int64Tensor(
|
||||||
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v)
|
Array2::<i64>::from_shape_vec([rows, cols], v)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.into_dyn(),
|
.into_dyn(),
|
||||||
);
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_user_embedding(
|
fn get_user_embedding(
|
||||||
|
@ -604,7 +569,7 @@ impl Converter for BatchPredictionRequestToTorchTensorConverter {
|
||||||
.map(|bpr| bpr.individual_features_list.len())
|
.map(|bpr| bpr.individual_features_list.len())
|
||||||
.scan(0usize, |acc, e| {
|
.scan(0usize, |acc, e| {
|
||||||
//running total
|
//running total
|
||||||
*acc = *acc + e;
|
*acc += e;
|
||||||
Some(*acc)
|
Some(*acc)
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
|
@ -9,15 +9,17 @@ use std::{
|
||||||
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
|
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
|
||||||
let file = File::open(file_name).expect("could not read file");
|
let file = File::open(file_name).expect("could not read file");
|
||||||
let mut result = vec![];
|
let mut result = vec![];
|
||||||
for line in io::BufReader::new(file).lines() {
|
for (mut line_count, line) in io::BufReader::new(file).lines().enumerate() {
|
||||||
|
line_count += 1;
|
||||||
match base64::decode(line.unwrap().trim()) {
|
match base64::decode(line.unwrap().trim()) {
|
||||||
Ok(payload) => result.push(payload),
|
Ok(payload) => result.push(payload),
|
||||||
Err(err) => println!("error decoding line {}", err),
|
Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
println!("reslt len: {}", result.len());
|
println!("result len: {}", result.len());
|
||||||
return result;
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) {
|
pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) {
|
||||||
let mut writer = WriteOptions::new()
|
let mut writer = WriteOptions::new()
|
||||||
.default_dtype()
|
.default_dtype()
|
||||||
|
|
Loading…
Reference in New Issue