From 8e7700e81c81f3d59d37be136e3aaa4186fc2eeb Mon Sep 17 00:00:00 2001 From: Massaki Archambault Date: Thu, 14 Oct 2021 15:24:06 -0400 Subject: [PATCH] seplit reward by type --- contrib/dashboard/records.yaml | 19 +++-- exporter.go | 141 ++++++++++++++++++++++++--------- heliumapi/accounts.go | 29 +------ heliumapi/hotspots.go | 64 ++++++++------- 4 files changed, 156 insertions(+), 97 deletions(-) diff --git a/contrib/dashboard/records.yaml b/contrib/dashboard/records.yaml index 25e0688..8ebcecd 100644 --- a/contrib/dashboard/records.yaml +++ b/contrib/dashboard/records.yaml @@ -2,11 +2,18 @@ groups: - name: helium-blockchain-exporter-rules interval: 1m rules: - - record: hotspot:helium_hotspot_rewards_hnt:increase15m - expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[15m])) + # Accounts + - record: account:helium_account_deposits_hnt:increase15m + expr: sum by (account) (increase(helium_account_deposits_hnt_total[15m])) + - record: account:helium_account_withdrawals_hnt:increase15m + expr: -sum by (account) (increase(helium_account_withdrawals_hnt_total[15m])) + + # Hotspots - record: type:helium_hotspot_activity:floor_increase15m expr: sum by (account, hotspot, hotspot_name, type) (floor(increase(helium_hotspot_activity_total[15m]))) - - record: hotspot:helium_hotspot_rewards_hnt:increase1d - expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1d])) - - record: hotspot:helium_hotspot_rewards_hnt:increase1w - expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1w])) + - record: hotspot:helium_hotspot_rewards_hnt:increase1h + expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1h])) + - record: hotspot:helium_hotspot_rewards_hnt:increase24h + expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[24h])) + - record: hotspot:helium_hotspot_rewards_hnt:increase7d + expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[7d])) diff --git a/exporter.go b/exporter.go index 54dcd32..dab1b24 100644 --- a/exporter.go +++ b/exporter.go @@ -43,26 +43,55 @@ func NewExporter(accountAddress []string) (*Exporter, error) { // Account represents a helium account type Account struct { - Address string - Tx AccountTx + Address string + UpdateLock sync.Mutex + LastUpdate time.Time + RewardsTotal map[string]int + Tx AccountTx + Hotspots map[string]Hotspot } func NewAccount(address string) Account { return Account{ - Address: address, + Address: address, + LastUpdate: time.Now(), + RewardsTotal: map[string]int{ + "data_credits": 0, + "poc_challengers": 0, + "poc_challengees": 0, + "poc_witnesses": 0, + }, Tx: AccountTx{ DepositTotal: 0, WithdrawalTotal: 0, - LastUpdate: time.Now(), }, + Hotspots: map[string]Hotspot{}, } } type AccountTx struct { DepositTotal int WithdrawalTotal int - LastUpdate time.Time - UpdateLock sync.Mutex +} + +// Hotspot represents a hotspot +type Hotspot struct { + Address string + LastUpdate time.Time + RewardsTotal map[string]int +} + +func NewHotspot(address string) Hotspot { + return Hotspot{ + Address: address, + LastUpdate: time.Now(), + RewardsTotal: map[string]int{ + "data_credits": 0, + "poc_challengers": 0, + "poc_challengees": 0, + "poc_witnesses": 0, + }, + } } const ( @@ -191,7 +220,7 @@ var ( prometheus.NewDesc( prometheus.BuildFQName(namespace, "account", "rewards_hnt_total"), "The number of HNT token rewarded to an account.", - commonAccountLabels, nil, + append(commonAccountLabels, "type"), nil, ), prometheus.CounterValue, } @@ -305,7 +334,7 @@ var ( prometheus.NewDesc( prometheus.BuildFQName(namespace, "hotspot", "rewards_hnt_total"), "The number of HNT token rewarded to a hotspot.", - commonHotspotLabels, nil, + append(commonHotspotLabels, "type"), nil, ), prometheus.CounterValue, } @@ -363,10 +392,9 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { go e.collectOracleMetrics(wg, ch) go e.collectStatsMetrics(wg, ch) for i := range e.Accounts { - wg.Add(5) + wg.Add(4) go e.collectAccountMetrics(wg, ch, &e.Accounts[i]) go e.collectAccountActivityMetrics(wg, ch, &e.Accounts[i]) - go e.collectAccountRewardsTotalMetrics(wg, ch, &e.Accounts[i]) go e.collectAccountTransactionsMetrics(wg, ch, &e.Accounts[i]) go e.collectHotspotMetrics(wg, ch, &e.Accounts[i]) @@ -465,22 +493,6 @@ func (e *Exporter) collectAccountActivityMetrics(wg *sync.WaitGroup, ch chan<- p } } -// collectAccountRewardsTotalMetrics collect the total rewards accumulated by an account from the helium api -func (e *Exporter) collectAccountRewardsTotalMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) { - defer wg.Done() - - accountRewardTotalsForAddress, err := heliumapi.GetRewardTotalsForAccount(account.Address, &e.StartTime, nil) - if err != nil { - log.Println(err) - return - } - - ch <- prometheus.MustNewConstMetric( - accountRewardsHnt.Desc, accountRewardsHnt.Type, accountRewardTotalsForAddress.Sum/blockchain_hnt_factor, - account.Address, - ) -} - // collectAccountTransactionsMetrics collect the total deposited/withdrawn by an account from the helium api func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) { defer wg.Done() @@ -502,14 +514,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan // We can only want to allow a single instance of the routine doing // calculations on the deposited and widthdrawn total. - account.Tx.UpdateLock.Lock() - defer account.Tx.UpdateLock.Unlock() + account.UpdateLock.Lock() + defer account.UpdateLock.Unlock() // We want to keep in memory the timestamp of the last activity we // received from the api. We cannot do something naive like [lastscrape, now] // because the api can take a few seconds to sync with the chain and // we can miss some activities by doing it that way. - lastActivityTime := account.Tx.LastUpdate.Unix() + lastActivityTime := account.LastUpdate.Unix() updateLastActivityTime := func(newTime int64) { if lastActivityTime < newTime { lastActivityTime = newTime @@ -517,7 +529,7 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan } // add 1 second to the last update time to avoid querying the same activity twice - minTime := account.Tx.LastUpdate.Add(time.Second * 1) + minTime := account.LastUpdate.Add(time.Second * 1) // Explicitly set max_time, to avoid issues with server-side caching maxTime := time.Now() activities, err := heliumapi.GetActivityForAccount(account.Address, activityTypes, &minTime, &maxTime) @@ -565,12 +577,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan } for _, activity := range activities.RewardsV1 { for _, reward := range activity.Rewards { + account.RewardsTotal[reward.Type] += reward.Amount account.Tx.DepositTotal += reward.Amount } updateLastActivityTime(activity.Time) } for _, activity := range activities.RewardsV2 { for _, reward := range activity.Rewards { + account.RewardsTotal[reward.Type] += reward.Amount account.Tx.DepositTotal += reward.Amount } updateLastActivityTime(activity.Time) @@ -595,7 +609,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan account.Tx.WithdrawalTotal += activity.StakeAmount updateLastActivityTime(activity.Time) } - account.Tx.LastUpdate = time.Unix(lastActivityTime, 0) + account.LastUpdate = time.Unix(lastActivityTime, 0) + + for rType, rTotal := range account.RewardsTotal { + ch <- prometheus.MustNewConstMetric( + accountRewardsHnt.Desc, accountRewardsHnt.Type, float64(rTotal)/blockchain_hnt_factor, + account.Address, rType, + ) + } ch <- prometheus.MustNewConstMetric( accountDepositsHnt.Desc, accountDepositsHnt.Type, float64(account.Tx.DepositTotal)/blockchain_hnt_factor, @@ -742,16 +763,64 @@ func (e *Exporter) collectHotspotActivityMetrics(wg *sync.WaitGroup, ch chan<- p func (e *Exporter) collectHotspotRewardsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account, hotspotData heliumapi.Hotspot) { defer wg.Done() - hotspotRewardTotalsForAddress, err := heliumapi.GetRewardsTotalForHotspot(hotspotData.Address, &e.StartTime, nil) + // We optimize our queries by asking the api only the activities we car about. + activityTypes := []string{ + "rewards_v1", + "rewards_v2", + } + + // TODO: optimize locks + account.UpdateLock.Lock() + defer account.UpdateLock.Unlock() + + hotspot, ok := account.Hotspots[hotspotData.Address] + if !ok { + hotspot = NewHotspot(hotspotData.Address) + account.Hotspots[hotspotData.Address] = hotspot + } + + // We want to keep in memory the timestamp of the last activity we + // received from the api. We cannot do something naive like [lastscrape, now] + // because the api can take a few seconds to sync with the chain and + // we can miss some activities by doing it that way. + lastActivityTime := hotspot.LastUpdate.Unix() + updateLastActivityTime := func(newTime int64) { + if lastActivityTime < newTime { + lastActivityTime = newTime + } + } + + // add 1 second to the last update time to avoid querying the same activity twice + minTime := hotspot.LastUpdate.Add(time.Second * 1) + // Explicitly set max_time, to avoid issues with server-side caching + maxTime := time.Now() + activities, err := heliumapi.GetHotspotActivity(hotspot.Address, activityTypes, &minTime, &maxTime) if err != nil { log.Println(err) return } - ch <- prometheus.MustNewConstMetric( - hotspotRewardsHnt.Desc, hotspotRewardsHnt.Type, hotspotRewardTotalsForAddress.Sum/blockchain_hnt_factor, - account.Address, hotspotData.Address, hotspotData.Name, - ) + for _, activity := range activities.RewardsV1 { + for _, reward := range activity.Rewards { + hotspot.RewardsTotal[reward.Type] += reward.Amount + } + updateLastActivityTime(activity.Time) + } + for _, activity := range activities.RewardsV2 { + for _, reward := range activity.Rewards { + hotspot.RewardsTotal[reward.Type] += reward.Amount + } + updateLastActivityTime(activity.Time) + } + hotspot.LastUpdate = time.Unix(lastActivityTime, 0) + + for rType, rTotal := range hotspot.RewardsTotal { + ch <- prometheus.MustNewConstMetric( + hotspotRewardsHnt.Desc, hotspotRewardsHnt.Type, float64(rTotal)/blockchain_hnt_factor, + account.Address, hotspot.Address, hotspotData.Name, rType, + ) + } + account.Hotspots[hotspot.Address] = hotspot } func main() { diff --git a/heliumapi/accounts.go b/heliumapi/accounts.go index 92caaad..95cd7c4 100644 --- a/heliumapi/accounts.go +++ b/heliumapi/accounts.go @@ -34,7 +34,7 @@ func GetActivityForAccount(account string, filterTypes []string, minTime *time.T path := "/v1/accounts/" + account + "/activity" params := map[string]string{} if filterTypes != nil { - params["min_time"] = strings.Join(filterTypes, ",") + params["filter_types"] = strings.Join(filterTypes, ",") } if minTime != nil { params["min_time"] = minTime.UTC().Format(time.RFC3339) @@ -83,33 +83,6 @@ func GetActivityCountsForAccount(account string) (*ActivityCounts, error) { return &respobject.Data, nil } -// query https://docs.helium.com/api/blockchain/accounts#reward-totals-for-an-account -func GetRewardTotalsForAccount(account string, minTime *time.Time, maxTime *time.Time) (*RewardTotal, error) { - path := "/v1/accounts/" + account + "/rewards/sum" - params := map[string]string{} - if minTime != nil { - params["min_time"] = minTime.UTC().Format(time.RFC3339) - } - if maxTime != nil { - params["max_time"] = maxTime.UTC().Format(time.RFC3339) - } - - // query the api - respBody, err := getHeliumApi(path, ¶ms) - if err != nil { - return nil, err - } - - // unmarshal the response - respobject := RewardTotalResp{} - err = json.Unmarshal(respBody, &respobject) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err) - } - - return &respobject.Data, nil -} - // query https://docs.helium.com/api/blockchain/accounts#hotspots-for-account func GetHotspotsForAccount(account string) ([]Hotspot, error) { path := "/v1/accounts/" + account + "/hotspots" diff --git a/heliumapi/hotspots.go b/heliumapi/hotspots.go index 1ef9342..71db9b2 100644 --- a/heliumapi/hotspots.go +++ b/heliumapi/hotspots.go @@ -3,9 +3,46 @@ package heliumapi import ( "encoding/json" "fmt" + "strings" "time" + + "github.com/helium-blockchain-exporter/heliumapi/activity" ) +// query https://docs.helium.com/api/blockchain/hotspots#hotspot-activity +func GetHotspotActivity(hotspot string, filterTypes []string, minTime *time.Time, maxTime *time.Time) (*activity.Activities, error) { + path := "/v1/hotspots/" + hotspot + "/activity" + params := map[string]string{} + if filterTypes != nil { + params["filter_types"] = strings.Join(filterTypes, ",") + } + if minTime != nil { + params["min_time"] = minTime.UTC().Format(time.RFC3339) + } + if maxTime != nil { + params["max_time"] = maxTime.UTC().Format(time.RFC3339) + } + + // query the api + resBodies, err := getHeliumApiWithCursor(path, ¶ms) + if err != nil { + return nil, err + } + + // unmarshal the responses and merge them all together + combinedResp := activity.ActivityResp{} + for _, respBody := range resBodies { + activityResp := activity.ActivityResp{} + err = json.Unmarshal(respBody, &activityResp) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err) + } + combinedResp.Data = append(combinedResp.Data, activityResp.Data...) + } + + return activity.NewActivities(combinedResp) +} + // query https://docs.helium.com/api/blockchain/hotspots#hotspots-activity-counts func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) { path := "/v1/hotspots/" + hotspot + "/activity/count" @@ -26,33 +63,6 @@ func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) { return &respobject.Data, nil } -// query https://docs.helium.com/api/blockchain/hotspots#reward-total-for-a-hotspot -func GetRewardsTotalForHotspot(hotspot string, minTime *time.Time, maxTime *time.Time) (*RewardTotal, error) { - path := "/v1/hotspots/" + hotspot + "/rewards/sum" - params := map[string]string{} - if minTime != nil { - params["min_time"] = minTime.UTC().Format(time.RFC3339) - } - if maxTime != nil { - params["max_time"] = maxTime.UTC().Format(time.RFC3339) - } - - // query the api - respBody, err := getHeliumApi(path, ¶ms) - if err != nil { - return nil, err - } - - // unmarshal the response - respobject := RewardTotalResp{} - err = json.Unmarshal(respBody, &respobject) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err) - } - - return &respobject.Data, nil -} - // query https://docs.helium.com/api/blockchain/hotspots#witnesses-for-a-hotspot func GetWitnessesForHotspot(hotspot string) ([]Hotspot, error) { path := "/v1/hotspots/" + hotspot + "/witnesses"