1
0
Fork 0

Compare commits

...

2 Commits

Author SHA1 Message Date
Massaki Archambault 8e7700e81c seplit reward by type
continuous-integration/drone/push Build is passing Details
2021-10-14 15:24:06 -04:00
Massaki Archambault 33e1b52fc0 comment out noisy log 2021-10-12 10:37:03 -04:00
5 changed files with 158 additions and 100 deletions

View File

@ -2,11 +2,18 @@ groups:
- name: helium-blockchain-exporter-rules
interval: 1m
rules:
- record: hotspot:helium_hotspot_rewards_hnt:increase15m
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[15m]))
# Accounts
- record: account:helium_account_deposits_hnt:increase15m
expr: sum by (account) (increase(helium_account_deposits_hnt_total[15m]))
- record: account:helium_account_withdrawals_hnt:increase15m
expr: -sum by (account) (increase(helium_account_withdrawals_hnt_total[15m]))
# Hotspots
- record: type:helium_hotspot_activity:floor_increase15m
expr: sum by (account, hotspot, hotspot_name, type) (floor(increase(helium_hotspot_activity_total[15m])))
- record: hotspot:helium_hotspot_rewards_hnt:increase1d
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1d]))
- record: hotspot:helium_hotspot_rewards_hnt:increase1w
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1w]))
- record: hotspot:helium_hotspot_rewards_hnt:increase1h
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1h]))
- record: hotspot:helium_hotspot_rewards_hnt:increase24h
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[24h]))
- record: hotspot:helium_hotspot_rewards_hnt:increase7d
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[7d]))

View File

@ -43,26 +43,55 @@ func NewExporter(accountAddress []string) (*Exporter, error) {
// Account represents a helium account
type Account struct {
Address string
Tx AccountTx
Address string
UpdateLock sync.Mutex
LastUpdate time.Time
RewardsTotal map[string]int
Tx AccountTx
Hotspots map[string]Hotspot
}
func NewAccount(address string) Account {
return Account{
Address: address,
Address: address,
LastUpdate: time.Now(),
RewardsTotal: map[string]int{
"data_credits": 0,
"poc_challengers": 0,
"poc_challengees": 0,
"poc_witnesses": 0,
},
Tx: AccountTx{
DepositTotal: 0,
WithdrawalTotal: 0,
LastUpdate: time.Now(),
},
Hotspots: map[string]Hotspot{},
}
}
type AccountTx struct {
DepositTotal int
WithdrawalTotal int
LastUpdate time.Time
UpdateLock sync.Mutex
}
// Hotspot represents a hotspot
type Hotspot struct {
Address string
LastUpdate time.Time
RewardsTotal map[string]int
}
func NewHotspot(address string) Hotspot {
return Hotspot{
Address: address,
LastUpdate: time.Now(),
RewardsTotal: map[string]int{
"data_credits": 0,
"poc_challengers": 0,
"poc_challengees": 0,
"poc_witnesses": 0,
},
}
}
const (
@ -191,7 +220,7 @@ var (
prometheus.NewDesc(
prometheus.BuildFQName(namespace, "account", "rewards_hnt_total"),
"The number of HNT token rewarded to an account.",
commonAccountLabels, nil,
append(commonAccountLabels, "type"), nil,
),
prometheus.CounterValue,
}
@ -305,7 +334,7 @@ var (
prometheus.NewDesc(
prometheus.BuildFQName(namespace, "hotspot", "rewards_hnt_total"),
"The number of HNT token rewarded to a hotspot.",
commonHotspotLabels, nil,
append(commonHotspotLabels, "type"), nil,
),
prometheus.CounterValue,
}
@ -363,10 +392,9 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
go e.collectOracleMetrics(wg, ch)
go e.collectStatsMetrics(wg, ch)
for i := range e.Accounts {
wg.Add(5)
wg.Add(4)
go e.collectAccountMetrics(wg, ch, &e.Accounts[i])
go e.collectAccountActivityMetrics(wg, ch, &e.Accounts[i])
go e.collectAccountRewardsTotalMetrics(wg, ch, &e.Accounts[i])
go e.collectAccountTransactionsMetrics(wg, ch, &e.Accounts[i])
go e.collectHotspotMetrics(wg, ch, &e.Accounts[i])
@ -465,22 +493,6 @@ func (e *Exporter) collectAccountActivityMetrics(wg *sync.WaitGroup, ch chan<- p
}
}
// collectAccountRewardsTotalMetrics collect the total rewards accumulated by an account from the helium api
func (e *Exporter) collectAccountRewardsTotalMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) {
defer wg.Done()
accountRewardTotalsForAddress, err := heliumapi.GetRewardTotalsForAccount(account.Address, &e.StartTime, nil)
if err != nil {
log.Println(err)
return
}
ch <- prometheus.MustNewConstMetric(
accountRewardsHnt.Desc, accountRewardsHnt.Type, accountRewardTotalsForAddress.Sum/blockchain_hnt_factor,
account.Address,
)
}
// collectAccountTransactionsMetrics collect the total deposited/withdrawn by an account from the helium api
func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) {
defer wg.Done()
@ -502,14 +514,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
// We can only want to allow a single instance of the routine doing
// calculations on the deposited and widthdrawn total.
account.Tx.UpdateLock.Lock()
defer account.Tx.UpdateLock.Unlock()
account.UpdateLock.Lock()
defer account.UpdateLock.Unlock()
// We want to keep in memory the timestamp of the last activity we
// received from the api. We cannot do something naive like [lastscrape, now]
// because the api can take a few seconds to sync with the chain and
// we can miss some activities by doing it that way.
lastActivityTime := account.Tx.LastUpdate.Unix()
lastActivityTime := account.LastUpdate.Unix()
updateLastActivityTime := func(newTime int64) {
if lastActivityTime < newTime {
lastActivityTime = newTime
@ -517,7 +529,7 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
}
// add 1 second to the last update time to avoid querying the same activity twice
minTime := account.Tx.LastUpdate.Add(time.Second * 1)
minTime := account.LastUpdate.Add(time.Second * 1)
// Explicitly set max_time, to avoid issues with server-side caching
maxTime := time.Now()
activities, err := heliumapi.GetActivityForAccount(account.Address, activityTypes, &minTime, &maxTime)
@ -565,12 +577,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
}
for _, activity := range activities.RewardsV1 {
for _, reward := range activity.Rewards {
account.RewardsTotal[reward.Type] += reward.Amount
account.Tx.DepositTotal += reward.Amount
}
updateLastActivityTime(activity.Time)
}
for _, activity := range activities.RewardsV2 {
for _, reward := range activity.Rewards {
account.RewardsTotal[reward.Type] += reward.Amount
account.Tx.DepositTotal += reward.Amount
}
updateLastActivityTime(activity.Time)
@ -595,7 +609,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
account.Tx.WithdrawalTotal += activity.StakeAmount
updateLastActivityTime(activity.Time)
}
account.Tx.LastUpdate = time.Unix(lastActivityTime, 0)
account.LastUpdate = time.Unix(lastActivityTime, 0)
for rType, rTotal := range account.RewardsTotal {
ch <- prometheus.MustNewConstMetric(
accountRewardsHnt.Desc, accountRewardsHnt.Type, float64(rTotal)/blockchain_hnt_factor,
account.Address, rType,
)
}
ch <- prometheus.MustNewConstMetric(
accountDepositsHnt.Desc, accountDepositsHnt.Type, float64(account.Tx.DepositTotal)/blockchain_hnt_factor,
@ -742,16 +763,64 @@ func (e *Exporter) collectHotspotActivityMetrics(wg *sync.WaitGroup, ch chan<- p
func (e *Exporter) collectHotspotRewardsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account, hotspotData heliumapi.Hotspot) {
defer wg.Done()
hotspotRewardTotalsForAddress, err := heliumapi.GetRewardsTotalForHotspot(hotspotData.Address, &e.StartTime, nil)
// We optimize our queries by asking the api only the activities we car about.
activityTypes := []string{
"rewards_v1",
"rewards_v2",
}
// TODO: optimize locks
account.UpdateLock.Lock()
defer account.UpdateLock.Unlock()
hotspot, ok := account.Hotspots[hotspotData.Address]
if !ok {
hotspot = NewHotspot(hotspotData.Address)
account.Hotspots[hotspotData.Address] = hotspot
}
// We want to keep in memory the timestamp of the last activity we
// received from the api. We cannot do something naive like [lastscrape, now]
// because the api can take a few seconds to sync with the chain and
// we can miss some activities by doing it that way.
lastActivityTime := hotspot.LastUpdate.Unix()
updateLastActivityTime := func(newTime int64) {
if lastActivityTime < newTime {
lastActivityTime = newTime
}
}
// add 1 second to the last update time to avoid querying the same activity twice
minTime := hotspot.LastUpdate.Add(time.Second * 1)
// Explicitly set max_time, to avoid issues with server-side caching
maxTime := time.Now()
activities, err := heliumapi.GetHotspotActivity(hotspot.Address, activityTypes, &minTime, &maxTime)
if err != nil {
log.Println(err)
return
}
ch <- prometheus.MustNewConstMetric(
hotspotRewardsHnt.Desc, hotspotRewardsHnt.Type, hotspotRewardTotalsForAddress.Sum/blockchain_hnt_factor,
account.Address, hotspotData.Address, hotspotData.Name,
)
for _, activity := range activities.RewardsV1 {
for _, reward := range activity.Rewards {
hotspot.RewardsTotal[reward.Type] += reward.Amount
}
updateLastActivityTime(activity.Time)
}
for _, activity := range activities.RewardsV2 {
for _, reward := range activity.Rewards {
hotspot.RewardsTotal[reward.Type] += reward.Amount
}
updateLastActivityTime(activity.Time)
}
hotspot.LastUpdate = time.Unix(lastActivityTime, 0)
for rType, rTotal := range hotspot.RewardsTotal {
ch <- prometheus.MustNewConstMetric(
hotspotRewardsHnt.Desc, hotspotRewardsHnt.Type, float64(rTotal)/blockchain_hnt_factor,
account.Address, hotspot.Address, hotspotData.Name, rType,
)
}
account.Hotspots[hotspot.Address] = hotspot
}
func main() {

View File

@ -34,7 +34,7 @@ func GetActivityForAccount(account string, filterTypes []string, minTime *time.T
path := "/v1/accounts/" + account + "/activity"
params := map[string]string{}
if filterTypes != nil {
params["min_time"] = strings.Join(filterTypes, ",")
params["filter_types"] = strings.Join(filterTypes, ",")
}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
@ -83,33 +83,6 @@ func GetActivityCountsForAccount(account string) (*ActivityCounts, error) {
return &respobject.Data, nil
}
// query https://docs.helium.com/api/blockchain/accounts#reward-totals-for-an-account
func GetRewardTotalsForAccount(account string, minTime *time.Time, maxTime *time.Time) (*RewardTotal, error) {
path := "/v1/accounts/" + account + "/rewards/sum"
params := map[string]string{}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
}
if maxTime != nil {
params["max_time"] = maxTime.UTC().Format(time.RFC3339)
}
// query the api
respBody, err := getHeliumApi(path, &params)
if err != nil {
return nil, err
}
// unmarshal the response
respobject := RewardTotalResp{}
err = json.Unmarshal(respBody, &respobject)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err)
}
return &respobject.Data, nil
}
// query https://docs.helium.com/api/blockchain/accounts#hotspots-for-account
func GetHotspotsForAccount(account string) ([]Hotspot, error) {
path := "/v1/accounts/" + account + "/hotspots"

View File

@ -3,9 +3,46 @@ package heliumapi
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/helium-blockchain-exporter/heliumapi/activity"
)
// query https://docs.helium.com/api/blockchain/hotspots#hotspot-activity
func GetHotspotActivity(hotspot string, filterTypes []string, minTime *time.Time, maxTime *time.Time) (*activity.Activities, error) {
path := "/v1/hotspots/" + hotspot + "/activity"
params := map[string]string{}
if filterTypes != nil {
params["filter_types"] = strings.Join(filterTypes, ",")
}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
}
if maxTime != nil {
params["max_time"] = maxTime.UTC().Format(time.RFC3339)
}
// query the api
resBodies, err := getHeliumApiWithCursor(path, &params)
if err != nil {
return nil, err
}
// unmarshal the responses and merge them all together
combinedResp := activity.ActivityResp{}
for _, respBody := range resBodies {
activityResp := activity.ActivityResp{}
err = json.Unmarshal(respBody, &activityResp)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err)
}
combinedResp.Data = append(combinedResp.Data, activityResp.Data...)
}
return activity.NewActivities(combinedResp)
}
// query https://docs.helium.com/api/blockchain/hotspots#hotspots-activity-counts
func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) {
path := "/v1/hotspots/" + hotspot + "/activity/count"
@ -26,33 +63,6 @@ func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) {
return &respobject.Data, nil
}
// query https://docs.helium.com/api/blockchain/hotspots#reward-total-for-a-hotspot
func GetRewardsTotalForHotspot(hotspot string, minTime *time.Time, maxTime *time.Time) (*RewardTotal, error) {
path := "/v1/hotspots/" + hotspot + "/rewards/sum"
params := map[string]string{}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
}
if maxTime != nil {
params["max_time"] = maxTime.UTC().Format(time.RFC3339)
}
// query the api
respBody, err := getHeliumApi(path, &params)
if err != nil {
return nil, err
}
// unmarshal the response
respobject := RewardTotalResp{}
err = json.Unmarshal(respBody, &respobject)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err)
}
return &respobject.Data, nil
}
// query https://docs.helium.com/api/blockchain/hotspots#witnesses-for-a-hotspot
func GetWitnessesForHotspot(hotspot string) ([]Hotspot, error) {
path := "/v1/hotspots/" + hotspot + "/witnesses"

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
)
@ -48,7 +47,7 @@ func getHeliumApi(path string, params *map[string]string) ([]byte, error) {
}
// query the api
log.Printf("querying %v", req.URL.String())
// log.Printf("querying %v", req.URL.String())
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to query %v: %v", req.URL.String(), err)
@ -98,7 +97,7 @@ func getHeliumApiWithCursor(path string, params *map[string]string) ([][]byte, e
defer resp.Body.Close()
// read the response body and add it to the result array
log.Printf("querying %v", req.URL.String())
// log.Printf("querying %v", req.URL.String())
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body of %v: %v", req.URL.String(), err)