1
0
Fork 0

Compare commits

..

2 Commits

Author SHA1 Message Date
Massaki Archambault 8e7700e81c seplit reward by type
continuous-integration/drone/push Build is passing Details
2021-10-14 15:24:06 -04:00
Massaki Archambault 33e1b52fc0 comment out noisy log 2021-10-12 10:37:03 -04:00
5 changed files with 158 additions and 100 deletions

View File

@ -2,11 +2,18 @@ groups:
- name: helium-blockchain-exporter-rules - name: helium-blockchain-exporter-rules
interval: 1m interval: 1m
rules: rules:
- record: hotspot:helium_hotspot_rewards_hnt:increase15m # Accounts
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[15m])) - record: account:helium_account_deposits_hnt:increase15m
expr: sum by (account) (increase(helium_account_deposits_hnt_total[15m]))
- record: account:helium_account_withdrawals_hnt:increase15m
expr: -sum by (account) (increase(helium_account_withdrawals_hnt_total[15m]))
# Hotspots
- record: type:helium_hotspot_activity:floor_increase15m - record: type:helium_hotspot_activity:floor_increase15m
expr: sum by (account, hotspot, hotspot_name, type) (floor(increase(helium_hotspot_activity_total[15m]))) expr: sum by (account, hotspot, hotspot_name, type) (floor(increase(helium_hotspot_activity_total[15m])))
- record: hotspot:helium_hotspot_rewards_hnt:increase1d - record: hotspot:helium_hotspot_rewards_hnt:increase1h
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1d])) expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1h]))
- record: hotspot:helium_hotspot_rewards_hnt:increase1w - record: hotspot:helium_hotspot_rewards_hnt:increase24h
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[1w])) expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[24h]))
- record: hotspot:helium_hotspot_rewards_hnt:increase7d
expr: sum by (account, hotspot, hotspot_name) (increase(helium_hotspot_rewards_hnt_total[7d]))

View File

@ -44,25 +44,54 @@ func NewExporter(accountAddress []string) (*Exporter, error) {
// Account represents a helium account // Account represents a helium account
type Account struct { type Account struct {
Address string Address string
UpdateLock sync.Mutex
LastUpdate time.Time
RewardsTotal map[string]int
Tx AccountTx Tx AccountTx
Hotspots map[string]Hotspot
} }
func NewAccount(address string) Account { func NewAccount(address string) Account {
return Account{ return Account{
Address: address, Address: address,
LastUpdate: time.Now(),
RewardsTotal: map[string]int{
"data_credits": 0,
"poc_challengers": 0,
"poc_challengees": 0,
"poc_witnesses": 0,
},
Tx: AccountTx{ Tx: AccountTx{
DepositTotal: 0, DepositTotal: 0,
WithdrawalTotal: 0, WithdrawalTotal: 0,
LastUpdate: time.Now(),
}, },
Hotspots: map[string]Hotspot{},
} }
} }
type AccountTx struct { type AccountTx struct {
DepositTotal int DepositTotal int
WithdrawalTotal int WithdrawalTotal int
}
// Hotspot represents a hotspot
type Hotspot struct {
Address string
LastUpdate time.Time LastUpdate time.Time
UpdateLock sync.Mutex RewardsTotal map[string]int
}
func NewHotspot(address string) Hotspot {
return Hotspot{
Address: address,
LastUpdate: time.Now(),
RewardsTotal: map[string]int{
"data_credits": 0,
"poc_challengers": 0,
"poc_challengees": 0,
"poc_witnesses": 0,
},
}
} }
const ( const (
@ -191,7 +220,7 @@ var (
prometheus.NewDesc( prometheus.NewDesc(
prometheus.BuildFQName(namespace, "account", "rewards_hnt_total"), prometheus.BuildFQName(namespace, "account", "rewards_hnt_total"),
"The number of HNT token rewarded to an account.", "The number of HNT token rewarded to an account.",
commonAccountLabels, nil, append(commonAccountLabels, "type"), nil,
), ),
prometheus.CounterValue, prometheus.CounterValue,
} }
@ -305,7 +334,7 @@ var (
prometheus.NewDesc( prometheus.NewDesc(
prometheus.BuildFQName(namespace, "hotspot", "rewards_hnt_total"), prometheus.BuildFQName(namespace, "hotspot", "rewards_hnt_total"),
"The number of HNT token rewarded to a hotspot.", "The number of HNT token rewarded to a hotspot.",
commonHotspotLabels, nil, append(commonHotspotLabels, "type"), nil,
), ),
prometheus.CounterValue, prometheus.CounterValue,
} }
@ -363,10 +392,9 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
go e.collectOracleMetrics(wg, ch) go e.collectOracleMetrics(wg, ch)
go e.collectStatsMetrics(wg, ch) go e.collectStatsMetrics(wg, ch)
for i := range e.Accounts { for i := range e.Accounts {
wg.Add(5) wg.Add(4)
go e.collectAccountMetrics(wg, ch, &e.Accounts[i]) go e.collectAccountMetrics(wg, ch, &e.Accounts[i])
go e.collectAccountActivityMetrics(wg, ch, &e.Accounts[i]) go e.collectAccountActivityMetrics(wg, ch, &e.Accounts[i])
go e.collectAccountRewardsTotalMetrics(wg, ch, &e.Accounts[i])
go e.collectAccountTransactionsMetrics(wg, ch, &e.Accounts[i]) go e.collectAccountTransactionsMetrics(wg, ch, &e.Accounts[i])
go e.collectHotspotMetrics(wg, ch, &e.Accounts[i]) go e.collectHotspotMetrics(wg, ch, &e.Accounts[i])
@ -465,22 +493,6 @@ func (e *Exporter) collectAccountActivityMetrics(wg *sync.WaitGroup, ch chan<- p
} }
} }
// collectAccountRewardsTotalMetrics collect the total rewards accumulated by an account from the helium api
func (e *Exporter) collectAccountRewardsTotalMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) {
defer wg.Done()
accountRewardTotalsForAddress, err := heliumapi.GetRewardTotalsForAccount(account.Address, &e.StartTime, nil)
if err != nil {
log.Println(err)
return
}
ch <- prometheus.MustNewConstMetric(
accountRewardsHnt.Desc, accountRewardsHnt.Type, accountRewardTotalsForAddress.Sum/blockchain_hnt_factor,
account.Address,
)
}
// collectAccountTransactionsMetrics collect the total deposited/withdrawn by an account from the helium api // collectAccountTransactionsMetrics collect the total deposited/withdrawn by an account from the helium api
func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) { func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account) {
defer wg.Done() defer wg.Done()
@ -502,14 +514,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
// We can only want to allow a single instance of the routine doing // We can only want to allow a single instance of the routine doing
// calculations on the deposited and widthdrawn total. // calculations on the deposited and widthdrawn total.
account.Tx.UpdateLock.Lock() account.UpdateLock.Lock()
defer account.Tx.UpdateLock.Unlock() defer account.UpdateLock.Unlock()
// We want to keep in memory the timestamp of the last activity we // We want to keep in memory the timestamp of the last activity we
// received from the api. We cannot do something naive like [lastscrape, now] // received from the api. We cannot do something naive like [lastscrape, now]
// because the api can take a few seconds to sync with the chain and // because the api can take a few seconds to sync with the chain and
// we can miss some activities by doing it that way. // we can miss some activities by doing it that way.
lastActivityTime := account.Tx.LastUpdate.Unix() lastActivityTime := account.LastUpdate.Unix()
updateLastActivityTime := func(newTime int64) { updateLastActivityTime := func(newTime int64) {
if lastActivityTime < newTime { if lastActivityTime < newTime {
lastActivityTime = newTime lastActivityTime = newTime
@ -517,7 +529,7 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
} }
// add 1 second to the last update time to avoid querying the same activity twice // add 1 second to the last update time to avoid querying the same activity twice
minTime := account.Tx.LastUpdate.Add(time.Second * 1) minTime := account.LastUpdate.Add(time.Second * 1)
// Explicitly set max_time, to avoid issues with server-side caching // Explicitly set max_time, to avoid issues with server-side caching
maxTime := time.Now() maxTime := time.Now()
activities, err := heliumapi.GetActivityForAccount(account.Address, activityTypes, &minTime, &maxTime) activities, err := heliumapi.GetActivityForAccount(account.Address, activityTypes, &minTime, &maxTime)
@ -565,12 +577,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
} }
for _, activity := range activities.RewardsV1 { for _, activity := range activities.RewardsV1 {
for _, reward := range activity.Rewards { for _, reward := range activity.Rewards {
account.RewardsTotal[reward.Type] += reward.Amount
account.Tx.DepositTotal += reward.Amount account.Tx.DepositTotal += reward.Amount
} }
updateLastActivityTime(activity.Time) updateLastActivityTime(activity.Time)
} }
for _, activity := range activities.RewardsV2 { for _, activity := range activities.RewardsV2 {
for _, reward := range activity.Rewards { for _, reward := range activity.Rewards {
account.RewardsTotal[reward.Type] += reward.Amount
account.Tx.DepositTotal += reward.Amount account.Tx.DepositTotal += reward.Amount
} }
updateLastActivityTime(activity.Time) updateLastActivityTime(activity.Time)
@ -595,7 +609,14 @@ func (e *Exporter) collectAccountTransactionsMetrics(wg *sync.WaitGroup, ch chan
account.Tx.WithdrawalTotal += activity.StakeAmount account.Tx.WithdrawalTotal += activity.StakeAmount
updateLastActivityTime(activity.Time) updateLastActivityTime(activity.Time)
} }
account.Tx.LastUpdate = time.Unix(lastActivityTime, 0) account.LastUpdate = time.Unix(lastActivityTime, 0)
for rType, rTotal := range account.RewardsTotal {
ch <- prometheus.MustNewConstMetric(
accountRewardsHnt.Desc, accountRewardsHnt.Type, float64(rTotal)/blockchain_hnt_factor,
account.Address, rType,
)
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
accountDepositsHnt.Desc, accountDepositsHnt.Type, float64(account.Tx.DepositTotal)/blockchain_hnt_factor, accountDepositsHnt.Desc, accountDepositsHnt.Type, float64(account.Tx.DepositTotal)/blockchain_hnt_factor,
@ -742,17 +763,65 @@ func (e *Exporter) collectHotspotActivityMetrics(wg *sync.WaitGroup, ch chan<- p
func (e *Exporter) collectHotspotRewardsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account, hotspotData heliumapi.Hotspot) { func (e *Exporter) collectHotspotRewardsMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric, account *Account, hotspotData heliumapi.Hotspot) {
defer wg.Done() defer wg.Done()
hotspotRewardTotalsForAddress, err := heliumapi.GetRewardsTotalForHotspot(hotspotData.Address, &e.StartTime, nil) // We optimize our queries by asking the api only the activities we car about.
activityTypes := []string{
"rewards_v1",
"rewards_v2",
}
// TODO: optimize locks
account.UpdateLock.Lock()
defer account.UpdateLock.Unlock()
hotspot, ok := account.Hotspots[hotspotData.Address]
if !ok {
hotspot = NewHotspot(hotspotData.Address)
account.Hotspots[hotspotData.Address] = hotspot
}
// We want to keep in memory the timestamp of the last activity we
// received from the api. We cannot do something naive like [lastscrape, now]
// because the api can take a few seconds to sync with the chain and
// we can miss some activities by doing it that way.
lastActivityTime := hotspot.LastUpdate.Unix()
updateLastActivityTime := func(newTime int64) {
if lastActivityTime < newTime {
lastActivityTime = newTime
}
}
// add 1 second to the last update time to avoid querying the same activity twice
minTime := hotspot.LastUpdate.Add(time.Second * 1)
// Explicitly set max_time, to avoid issues with server-side caching
maxTime := time.Now()
activities, err := heliumapi.GetHotspotActivity(hotspot.Address, activityTypes, &minTime, &maxTime)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
for _, activity := range activities.RewardsV1 {
for _, reward := range activity.Rewards {
hotspot.RewardsTotal[reward.Type] += reward.Amount
}
updateLastActivityTime(activity.Time)
}
for _, activity := range activities.RewardsV2 {
for _, reward := range activity.Rewards {
hotspot.RewardsTotal[reward.Type] += reward.Amount
}
updateLastActivityTime(activity.Time)
}
hotspot.LastUpdate = time.Unix(lastActivityTime, 0)
for rType, rTotal := range hotspot.RewardsTotal {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
hotspotRewardsHnt.Desc, hotspotRewardsHnt.Type, hotspotRewardTotalsForAddress.Sum/blockchain_hnt_factor, hotspotRewardsHnt.Desc, hotspotRewardsHnt.Type, float64(rTotal)/blockchain_hnt_factor,
account.Address, hotspotData.Address, hotspotData.Name, account.Address, hotspot.Address, hotspotData.Name, rType,
) )
} }
account.Hotspots[hotspot.Address] = hotspot
}
func main() { func main() {
fApiUrl := flag.String("apiUrl", "https://api.helium.io", "The helium api url") fApiUrl := flag.String("apiUrl", "https://api.helium.io", "The helium api url")

View File

@ -34,7 +34,7 @@ func GetActivityForAccount(account string, filterTypes []string, minTime *time.T
path := "/v1/accounts/" + account + "/activity" path := "/v1/accounts/" + account + "/activity"
params := map[string]string{} params := map[string]string{}
if filterTypes != nil { if filterTypes != nil {
params["min_time"] = strings.Join(filterTypes, ",") params["filter_types"] = strings.Join(filterTypes, ",")
} }
if minTime != nil { if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339) params["min_time"] = minTime.UTC().Format(time.RFC3339)
@ -83,33 +83,6 @@ func GetActivityCountsForAccount(account string) (*ActivityCounts, error) {
return &respobject.Data, nil return &respobject.Data, nil
} }
// query https://docs.helium.com/api/blockchain/accounts#reward-totals-for-an-account
func GetRewardTotalsForAccount(account string, minTime *time.Time, maxTime *time.Time) (*RewardTotal, error) {
path := "/v1/accounts/" + account + "/rewards/sum"
params := map[string]string{}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
}
if maxTime != nil {
params["max_time"] = maxTime.UTC().Format(time.RFC3339)
}
// query the api
respBody, err := getHeliumApi(path, &params)
if err != nil {
return nil, err
}
// unmarshal the response
respobject := RewardTotalResp{}
err = json.Unmarshal(respBody, &respobject)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err)
}
return &respobject.Data, nil
}
// query https://docs.helium.com/api/blockchain/accounts#hotspots-for-account // query https://docs.helium.com/api/blockchain/accounts#hotspots-for-account
func GetHotspotsForAccount(account string) ([]Hotspot, error) { func GetHotspotsForAccount(account string) ([]Hotspot, error) {
path := "/v1/accounts/" + account + "/hotspots" path := "/v1/accounts/" + account + "/hotspots"

View File

@ -3,9 +3,46 @@ package heliumapi
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/helium-blockchain-exporter/heliumapi/activity"
) )
// query https://docs.helium.com/api/blockchain/hotspots#hotspot-activity
func GetHotspotActivity(hotspot string, filterTypes []string, minTime *time.Time, maxTime *time.Time) (*activity.Activities, error) {
path := "/v1/hotspots/" + hotspot + "/activity"
params := map[string]string{}
if filterTypes != nil {
params["filter_types"] = strings.Join(filterTypes, ",")
}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
}
if maxTime != nil {
params["max_time"] = maxTime.UTC().Format(time.RFC3339)
}
// query the api
resBodies, err := getHeliumApiWithCursor(path, &params)
if err != nil {
return nil, err
}
// unmarshal the responses and merge them all together
combinedResp := activity.ActivityResp{}
for _, respBody := range resBodies {
activityResp := activity.ActivityResp{}
err = json.Unmarshal(respBody, &activityResp)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err)
}
combinedResp.Data = append(combinedResp.Data, activityResp.Data...)
}
return activity.NewActivities(combinedResp)
}
// query https://docs.helium.com/api/blockchain/hotspots#hotspots-activity-counts // query https://docs.helium.com/api/blockchain/hotspots#hotspots-activity-counts
func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) { func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) {
path := "/v1/hotspots/" + hotspot + "/activity/count" path := "/v1/hotspots/" + hotspot + "/activity/count"
@ -26,33 +63,6 @@ func GetHotspotActivityCounts(hotspot string) (*ActivityCounts, error) {
return &respobject.Data, nil return &respobject.Data, nil
} }
// query https://docs.helium.com/api/blockchain/hotspots#reward-total-for-a-hotspot
func GetRewardsTotalForHotspot(hotspot string, minTime *time.Time, maxTime *time.Time) (*RewardTotal, error) {
path := "/v1/hotspots/" + hotspot + "/rewards/sum"
params := map[string]string{}
if minTime != nil {
params["min_time"] = minTime.UTC().Format(time.RFC3339)
}
if maxTime != nil {
params["max_time"] = maxTime.UTC().Format(time.RFC3339)
}
// query the api
respBody, err := getHeliumApi(path, &params)
if err != nil {
return nil, err
}
// unmarshal the response
respobject := RewardTotalResp{}
err = json.Unmarshal(respBody, &respobject)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response from %v: %v", path, err)
}
return &respobject.Data, nil
}
// query https://docs.helium.com/api/blockchain/hotspots#witnesses-for-a-hotspot // query https://docs.helium.com/api/blockchain/hotspots#witnesses-for-a-hotspot
func GetWitnessesForHotspot(hotspot string) ([]Hotspot, error) { func GetWitnessesForHotspot(hotspot string) ([]Hotspot, error) {
path := "/v1/hotspots/" + hotspot + "/witnesses" path := "/v1/hotspots/" + hotspot + "/witnesses"

View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
) )
@ -48,7 +47,7 @@ func getHeliumApi(path string, params *map[string]string) ([]byte, error) {
} }
// query the api // query the api
log.Printf("querying %v", req.URL.String()) // log.Printf("querying %v", req.URL.String())
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to query %v: %v", req.URL.String(), err) return nil, fmt.Errorf("failed to query %v: %v", req.URL.String(), err)
@ -98,7 +97,7 @@ func getHeliumApiWithCursor(path string, params *map[string]string) ([][]byte, e
defer resp.Body.Close() defer resp.Body.Close()
// read the response body and add it to the result array // read the response body and add it to the result array
log.Printf("querying %v", req.URL.String()) // log.Printf("querying %v", req.URL.String())
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read response body of %v: %v", req.URL.String(), err) return nil, fmt.Errorf("failed to read response body of %v: %v", req.URL.String(), err)