Compare commits

..

6 Commits

Author SHA1 Message Date
dzhao 36588c650e
Merge pull request #452 from pouriya/refactor-dr_transform
Refactor `navi/dr_transform`
2023-04-04 11:06:35 -07:00
dzhao e8147d8e5f
Update README.md 2023-04-04 09:32:40 -07:00
dzhao 9f0afc0ec4
Merge pull request #550 from MrAuro/improve-navi-docs
(docs): Improve README file for Navi
2023-04-04 09:30:11 -07:00
Auro 9115361f00
(docs): Improve README file for Navi 2023-03-31 17:52:31 -07:00
Pouriya Jahanbakhsh ee5e7fc18d feat(navi/dr_transform): add filename:line to file reader error message 2023-04-01 02:33:51 +03:30
Pouriya Jahanbakhsh 2dbdfe173c ref(navi/dr_transform): fix clippy & formatting issues 2023-04-01 02:21:44 +03:30
4 changed files with 144 additions and 176 deletions

View File

@ -1,6 +1,6 @@
# Navi: High-Performance Machine Learning Serving Server in Rust # Navi: High-Performance Machine Learning Serving Server in Rust
Navi is a high-performance, versatile machine learning serving server implemented in Rust, tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features. Navi is a high-performance, versatile machine learning serving server implemented in Rust and tailored for production usage. It's designed to efficiently serve within the Twitter tech stack, offering top-notch performance while focusing on core features.
## Key Features ## Key Features
@ -23,12 +23,14 @@ While Navi's features may not be as comprehensive as its open-source counterpart
- `thrift_bpr_adapter`: generated thrift code for BatchPredictionRequest - `thrift_bpr_adapter`: generated thrift code for BatchPredictionRequest
## Content ## Content
We include all *.rs source code that makes up the main navi binaries for you to examine. The test and benchmark code, as well as configuration files are not included due to data security concerns. We have included all *.rs source code files that make up the main Navi binaries for you to examine. However, we have not included the test and benchmark code, or various configuration files, due to data security concerns.
## Run ## Run
in navi/navi you can run. Note you need to create a models directory and create some versions, preferably using epoch time, e.g., 1679693908377 In navi/navi, you can run the following commands:
- scripts/run_tf2.sh - `scripts/run_tf2.sh` for [TensorFlow](https://www.tensorflow.org/)
- scripts/run_onnx.sh - `scripts/run_onnx.sh` for [Onnx](https://onnx.ai/)
Do note that you need to create a models directory and create some versions, preferably using epoch time, e.g., `1679693908377`.
## Build ## Build
you can adapt the above scripts to build using Cargo You can adapt the above scripts to build using Cargo.

View File

@ -44,6 +44,5 @@ pub struct RenamedFeatures {
} }
pub fn parse(json_str: &str) -> Result<AllConfig, Error> { pub fn parse(json_str: &str) -> Result<AllConfig, Error> {
let all_config: AllConfig = serde_json::from_str(json_str)?; serde_json::from_str(json_str)
return std::result::Result::Ok(all_config);
} }

View File

@ -16,8 +16,7 @@ use segdense::util;
use thrift::protocol::{TBinaryInputProtocol, TSerializable}; use thrift::protocol::{TBinaryInputProtocol, TSerializable};
use thrift::transport::TBufferChannel; use thrift::transport::TBufferChannel;
use crate::{all_config}; use crate::{all_config, all_config::AllConfig};
use crate::all_config::AllConfig;
pub fn log_feature_match( pub fn log_feature_match(
dr: &DataRecord, dr: &DataRecord,
@ -27,26 +26,22 @@ pub fn log_feature_match(
// Note the following algorithm matches features from config using linear search. // Note the following algorithm matches features from config using linear search.
// Also the record source is MinDataRecord. This includes only binary and continous features for now. // Also the record source is MinDataRecord. This includes only binary and continous features for now.
for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap().into_iter() { for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() {
debug!( debug!(
"{} - Continous Datarecord => Feature ID: {}, Feature value: {}", "{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}"
dr_type, feature_id, feature_value
); );
for input_feature in &seg_dense_config.cont.input_features { for input_feature in &seg_dense_config.cont.input_features {
if input_feature.feature_id == *feature_id { if input_feature.feature_id == *feature_id {
debug!("Matching input feature: {:?}", input_feature) debug!("Matching input feature: {input_feature:?}")
} }
} }
} }
for feature_id in dr.binary_features.as_ref().unwrap().into_iter() { for feature_id in dr.binary_features.as_ref().unwrap() {
debug!( debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}");
"{} - Binary Datarecord => Feature ID: {}",
dr_type, feature_id
);
for input_feature in &seg_dense_config.binary.input_features { for input_feature in &seg_dense_config.binary.input_features {
if input_feature.feature_id == *feature_id { if input_feature.feature_id == *feature_id {
debug!("Found input feature: {:?}", input_feature) debug!("Found input feature: {input_feature:?}")
} }
} }
} }
@ -96,15 +91,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
reporting_feature_ids: Vec<(i64, &str)>, reporting_feature_ids: Vec<(i64, &str)>,
register_metric_fn: Option<impl Fn(&HistogramVec)>, register_metric_fn: Option<impl Fn(&HistogramVec)>,
) -> BatchPredictionRequestToTorchTensorConverter { ) -> BatchPredictionRequestToTorchTensorConverter {
let all_config_path = format!("{}/{}/all_config.json", model_dir, model_version); let all_config_path = format!("{model_dir}/{model_version}/all_config.json");
let seg_dense_config_path = format!( let seg_dense_config_path =
"{}/{}/segdense_transform_spec_home_recap_2022.json", format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json");
model_dir, model_version
);
let seg_dense_config = util::load_config(&seg_dense_config_path); let seg_dense_config = util::load_config(&seg_dense_config_path);
let all_config = all_config::parse( let all_config = all_config::parse(
&fs::read_to_string(&all_config_path) &fs::read_to_string(&all_config_path)
.unwrap_or_else(|error| panic!("error loading all_config.json - {}", error)), .unwrap_or_else(|error| panic!("error loading all_config.json - {error}")),
) )
.unwrap(); .unwrap();
@ -138,11 +131,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| { let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| {
let discrete = HistogramVec::new( let discrete = HistogramVec::new(
HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values") HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values")
.buckets(Vec::from(&[ .buckets(Vec::from([
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0,
300.0, 500.0, 1000.0, 10000.0, 100000.0, 300.0, 500.0, 1000.0, 10000.0, 100000.0,
] as &'static [f64])), ])),
&["feature_id"], &["feature_id"],
) )
.expect("metric cannot be created"); .expect("metric cannot be created");
@ -151,18 +144,18 @@ impl BatchPredictionRequestToTorchTensorConverter {
":navi:feature_id:continuous", ":navi:feature_id:continuous",
"continuous Feature ID values", "continuous Feature ID values",
) )
.buckets(Vec::from(&[ .buckets(Vec::from([
0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0,
130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0,
1000.0, 10000.0, 100000.0, 500.0, 1000.0, 10000.0, 100000.0,
] as &'static [f64])), ])),
&["feature_id"], &["feature_id"],
) )
.expect("metric cannot be created"); .expect("metric cannot be created");
register_metric_fn.map(|r| { if let Some(r) = register_metric_fn {
r(&discrete); r(&discrete);
r(&continuous); r(&continuous);
}); }
(discrete, continuous) (discrete, continuous)
}); });
@ -171,16 +164,13 @@ impl BatchPredictionRequestToTorchTensorConverter {
for (feature_id, feature_type) in reporting_feature_ids.iter() { for (feature_id, feature_type) in reporting_feature_ids.iter() {
match *feature_type { match *feature_type {
"discrete" => discrete_features_to_report.insert(feature_id.clone()), "discrete" => discrete_features_to_report.insert(*feature_id),
"continuous" => continuous_features_to_report.insert(feature_id.clone()), "continuous" => continuous_features_to_report.insert(*feature_id),
_ => panic!( _ => panic!("Invalid feature type {feature_type} for reporting metrics!"),
"Invalid feature type {} for reporting metrics!",
feature_type
),
}; };
} }
return BatchPredictionRequestToTorchTensorConverter { BatchPredictionRequestToTorchTensorConverter {
all_config, all_config,
seg_dense_config, seg_dense_config,
all_config_path, all_config_path,
@ -193,7 +183,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
continuous_features_to_report, continuous_features_to_report,
discrete_feature_metrics, discrete_feature_metrics,
continuous_feature_metrics, continuous_feature_metrics,
}; }
} }
fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 { fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 {
@ -203,7 +193,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
return feature.feature_id; return feature.feature_id;
} }
} }
return -1; -1
} }
fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest { fn parse_batch_prediction_request(bytes: Vec<u8>) -> BatchPredictionRequest {
@ -211,7 +201,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut bc = TBufferChannel::with_capacity(bytes.len(), 0); let mut bc = TBufferChannel::with_capacity(bytes.len(), 0);
bc.set_readable_bytes(&bytes); bc.set_readable_bytes(&bytes);
let mut protocol = TBinaryInputProtocol::new(bc, true); let mut protocol = TBinaryInputProtocol::new(bc, true);
return BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap(); BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap()
} }
fn get_embedding_tensors( fn get_embedding_tensors(
@ -228,9 +218,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
let mut working_set = vec![0 as f32; total_size]; let mut working_set = vec![0 as f32; total_size];
let mut bpr_start = 0; let mut bpr_start = 0;
for (bpr, &bpr_end) in bprs.iter().zip(batch_size) { for (bpr, &bpr_end) in bprs.iter().zip(batch_size) {
if bpr.common_features.is_some() { if bpr.common_features.is_some()
if bpr.common_features.as_ref().unwrap().tensors.is_some() { && bpr.common_features.as_ref().unwrap().tensors.is_some()
if bpr && bpr
.common_features .common_features
.as_ref() .as_ref()
.unwrap() .unwrap()
@ -268,8 +258,6 @@ impl BatchPredictionRequestToTorchTensorConverter {
} }
} }
} }
}
}
// find the feature in individual feature list and add to corresponding batch. // find the feature in individual feature list and add to corresponding batch.
for (index, datarecord) in bpr.individual_features_list.iter().enumerate() { for (index, datarecord) in bpr.individual_features_list.iter().enumerate() {
if datarecord.tensors.is_some() if datarecord.tensors.is_some()
@ -300,7 +288,7 @@ impl BatchPredictionRequestToTorchTensorConverter {
} }
bpr_start = bpr_end; bpr_start = bpr_end;
} }
return Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap(); Array2::<f32>::from_shape_vec([rows, cols], working_set).unwrap()
} }
// Todo : Refactor, create a generic version with different type and field accessors // Todo : Refactor, create a generic version with different type and field accessors
@ -310,9 +298,9 @@ impl BatchPredictionRequestToTorchTensorConverter {
// (INT64 --> INT64, DataRecord.discrete_feature) // (INT64 --> INT64, DataRecord.discrete_feature)
fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor { fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema // These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1]; let rows = batch_ends[batch_ends.len() - 1];
let cols: usize = 5293; let cols = 5293;
let full_size: usize = (rows * cols).try_into().unwrap(); let full_size = rows * cols;
let default_val = f32::NAN; let default_val = f32::NAN;
let mut tensor = vec![default_val; full_size]; let mut tensor = vec![default_val; full_size];
@ -337,55 +325,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap(); .unwrap();
for feature in common_features { for feature in common_features {
match self.feature_mapper.get(feature.0) { if let Some(f_info) = self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize; let idx = f_info.index_within_tensor as usize;
if idx < cols { if idx < cols {
// Set value in each row // Set value in each row
for r in bpr_start..bpr_end { for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap(); let flat_index = r * cols + idx;
tensor[flat_index] = feature.1.into_inner() as f32; tensor[flat_index] = feature.1.into_inner() as f32;
} }
} }
} }
None => (),
}
if self.continuous_features_to_report.contains(feature.0) { if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()]) .with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64) .observe(feature.1.into_inner())
} else if self.discrete_features_to_report.contains(feature.0) { } else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()]) .with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64) .observe(feature.1.into_inner())
} }
} }
} }
// Process the batch of datarecords // Process the batch of datarecords
for r in bpr_start..bpr_end { for r in bpr_start..bpr_end {
let dr: &DataRecord = let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
if dr.continuous_features.is_some() { if dr.continuous_features.is_some() {
for feature in dr.continuous_features.as_ref().unwrap() { for feature in dr.continuous_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature.0) { if let Some(f_info) = self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize; let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap(); let flat_index = r * cols + idx;
if flat_index < tensor.len() && idx < cols { if flat_index < tensor.len() && idx < cols {
tensor[flat_index] = feature.1.into_inner() as f32; tensor[flat_index] = feature.1.into_inner() as f32;
} }
} }
None => (),
}
if self.continuous_features_to_report.contains(feature.0) { if self.continuous_features_to_report.contains(feature.0) {
self.continuous_feature_metrics self.continuous_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()]) .with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64) .observe(feature.1.into_inner())
} else if self.discrete_features_to_report.contains(feature.0) { } else if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()]) .with_label_values(&[feature.0.to_string().as_str()])
.observe(feature.1.into_inner() as f64) .observe(feature.1.into_inner())
} }
} }
} }
@ -393,22 +374,19 @@ impl BatchPredictionRequestToTorchTensorConverter {
bpr_start = bpr_end; bpr_start = bpr_end;
} }
return InputTensor::FloatTensor( InputTensor::FloatTensor(
Array2::<f32>::from_shape_vec( Array2::<f32>::from_shape_vec([rows, cols], tensor)
[rows.try_into().unwrap(), cols.try_into().unwrap()],
tensor,
)
.unwrap() .unwrap()
.into_dyn(), .into_dyn(),
); )
} }
fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor { fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema // These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1]; let rows = batch_ends[batch_ends.len() - 1];
let cols: usize = 149; let cols = 149;
let full_size: usize = (rows * cols).try_into().unwrap(); let full_size = rows * cols;
let default_val: i64 = 0; let default_val = 0;
let mut v = vec![default_val; full_size]; let mut v = vec![default_val; full_size];
@ -432,55 +410,48 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap(); .unwrap();
for feature in common_features { for feature in common_features {
match self.feature_mapper.get(feature) { if let Some(f_info) = self.feature_mapper.get(feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize; let idx = f_info.index_within_tensor as usize;
if idx < cols { if idx < cols {
// Set value in each row // Set value in each row
for r in bpr_start..bpr_end { for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap(); let flat_index = r * cols + idx;
v[flat_index] = 1; v[flat_index] = 1;
} }
} }
} }
None => (),
}
} }
} }
// Process the batch of datarecords // Process the batch of datarecords
for r in bpr_start..bpr_end { for r in bpr_start..bpr_end {
let dr: &DataRecord = let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start];
&bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()];
if dr.binary_features.is_some() { if dr.binary_features.is_some() {
for feature in dr.binary_features.as_ref().unwrap() { for feature in dr.binary_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature) { if let Some(f_info) = self.feature_mapper.get(feature) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize; let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap(); let flat_index = r * cols + idx;
v[flat_index] = 1; v[flat_index] = 1;
} }
None => (),
}
} }
} }
} }
bpr_start = bpr_end; bpr_start = bpr_end;
} }
return InputTensor::Int64Tensor( InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v) Array2::<i64>::from_shape_vec([rows, cols], v)
.unwrap() .unwrap()
.into_dyn(), .into_dyn(),
); )
} }
#[allow(dead_code)] #[allow(dead_code)]
fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor { fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor {
// These need to be part of model schema // These need to be part of model schema
let rows: usize = batch_ends[batch_ends.len() - 1]; let rows = batch_ends[batch_ends.len() - 1];
let cols: usize = 320; let cols = 320;
let full_size: usize = (rows * cols).try_into().unwrap(); let full_size = rows * cols;
let default_val: i64 = 0; let default_val = 0;
let mut v = vec![default_val; full_size]; let mut v = vec![default_val; full_size];
@ -504,19 +475,16 @@ impl BatchPredictionRequestToTorchTensorConverter {
.unwrap(); .unwrap();
for feature in common_features { for feature in common_features {
match self.feature_mapper.get(feature.0) { if let Some(f_info) = self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize; let idx = f_info.index_within_tensor as usize;
if idx < cols { if idx < cols {
// Set value in each row // Set value in each row
for r in bpr_start..bpr_end { for r in bpr_start..bpr_end {
let flat_index: usize = (r * cols + idx).try_into().unwrap(); let flat_index = r * cols + idx;
v[flat_index] = *feature.1; v[flat_index] = *feature.1;
} }
} }
} }
None => (),
}
if self.discrete_features_to_report.contains(feature.0) { if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()]) .with_label_values(&[feature.0.to_string().as_str()])
@ -527,19 +495,16 @@ impl BatchPredictionRequestToTorchTensorConverter {
// Process the batch of datarecords // Process the batch of datarecords
for r in bpr_start..bpr_end { for r in bpr_start..bpr_end {
let dr: &DataRecord = &bpr.individual_features_list[usize::try_from(r).unwrap()]; let dr: &DataRecord = &bpr.individual_features_list[r];
if dr.discrete_features.is_some() { if dr.discrete_features.is_some() {
for feature in dr.discrete_features.as_ref().unwrap() { for feature in dr.discrete_features.as_ref().unwrap() {
match self.feature_mapper.get(&feature.0) { if let Some(f_info) = self.feature_mapper.get(feature.0) {
Some(f_info) => {
let idx = f_info.index_within_tensor as usize; let idx = f_info.index_within_tensor as usize;
let flat_index: usize = (r * cols + idx).try_into().unwrap(); let flat_index = r * cols + idx;
if flat_index < v.len() && idx < cols { if flat_index < v.len() && idx < cols {
v[flat_index] = *feature.1; v[flat_index] = *feature.1;
} }
} }
None => (),
}
if self.discrete_features_to_report.contains(feature.0) { if self.discrete_features_to_report.contains(feature.0) {
self.discrete_feature_metrics self.discrete_feature_metrics
.with_label_values(&[feature.0.to_string().as_str()]) .with_label_values(&[feature.0.to_string().as_str()])
@ -550,11 +515,11 @@ impl BatchPredictionRequestToTorchTensorConverter {
} }
bpr_start = bpr_end; bpr_start = bpr_end;
} }
return InputTensor::Int64Tensor( InputTensor::Int64Tensor(
Array2::<i64>::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v) Array2::<i64>::from_shape_vec([rows, cols], v)
.unwrap() .unwrap()
.into_dyn(), .into_dyn(),
); )
} }
fn get_user_embedding( fn get_user_embedding(
@ -604,7 +569,7 @@ impl Converter for BatchPredictionRequestToTorchTensorConverter {
.map(|bpr| bpr.individual_features_list.len()) .map(|bpr| bpr.individual_features_list.len())
.scan(0usize, |acc, e| { .scan(0usize, |acc, e| {
//running total //running total
*acc = *acc + e; *acc += e;
Some(*acc) Some(*acc)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View File

@ -9,15 +9,17 @@ use std::{
pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> { pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec<Vec<u8>> {
let file = File::open(file_name).expect("could not read file"); let file = File::open(file_name).expect("could not read file");
let mut result = vec![]; let mut result = vec![];
for line in io::BufReader::new(file).lines() { for (mut line_count, line) in io::BufReader::new(file).lines().enumerate() {
line_count += 1;
match base64::decode(line.unwrap().trim()) { match base64::decode(line.unwrap().trim()) {
Ok(payload) => result.push(payload), Ok(payload) => result.push(payload),
Err(err) => println!("error decoding line {}", err), Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"),
} }
} }
println!("reslt len: {}", result.len()); println!("result len: {}", result.len());
return result; result
} }
pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) { pub fn save_to_npy<T: npyz::Serialize + AutoSerialize>(data: &[T], save_to: String) {
let mut writer = WriteOptions::new() let mut writer = WriteOptions::new()
.default_dtype() .default_dtype()