This commit is contained in:
BAI Research 2023-07-29 02:36:29 +00:00 committed by GitHub
commit 63a868c458
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 9 additions and 3 deletions

View File

@ -40,7 +40,8 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
val negativeEdgeReverseOrdering = negativeEdgeOrdering.reverse val negativeEdgeReverseOrdering = negativeEdgeOrdering.reverse
implicit val pqMonoid: PriorityQueueMonoid[Edge] = implicit val pqMonoid: PriorityQueueMonoid[Edge] =
new PriorityQueueMonoid[Edge](maxDestinationIds)(negativeEdgeOrdering) new PriorityQueueMonoid[Edge](maxDestinationIds)(negativeEdgeOrdering)
val accountLevelShadowbanMaxTime = 1 //in days
override protected def configurePipeline( override protected def configurePipeline(
sc: ScioContext, sc: ScioContext,
opts: InteractionGraphNegativeOption opts: InteractionGraphNegativeOption
@ -49,17 +50,22 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
val endTs = opts.interval.getEndMillis val endTs = opts.interval.getEndMillis
// read input datasets // read input datasets
// we only count blocks in the past X days to prevent permanent "shadow-banning".
val blocks: SCollection[InteractionGraphRawInput] = val blocks: SCollection[InteractionGraphRawInput] =
GraphUtil.getFlockFeatures( GraphUtil.getFlockFeatures(
readSnapshot(FlockBlocksEdgesScalaDataset, sc), readSnapshot(FlockBlocksEdgesScalaDataset, sc),
FeatureName.NumBlocks, FeatureName.NumBlocks,
endTs) endTs)
.filter(_.age < accountLevelShadowbanMaxTime)
// we only count mutes in the past X days to prevent permanent "shadow-banning".
val mutes: SCollection[InteractionGraphRawInput] = val mutes: SCollection[InteractionGraphRawInput] =
GraphUtil.getFlockFeatures( GraphUtil.getFlockFeatures(
readSnapshot(FlockMutesEdgesScalaDataset, sc), readSnapshot(FlockMutesEdgesScalaDataset, sc),
FeatureName.NumMutes, FeatureName.NumMutes,
endTs) endTs)
.filter(_.age < accountLevelShadowbanMaxTime)
val abuseReports: SCollection[InteractionGraphRawInput] = val abuseReports: SCollection[InteractionGraphRawInput] =
GraphUtil.getFlockFeatures( GraphUtil.getFlockFeatures(
@ -73,7 +79,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
FeatureName.NumReportAsSpams, FeatureName.NumReportAsSpams,
endTs) endTs)
// we only keep unfollows in the past 90 days due to the huge size of this dataset, // we only keep unfollows in the past X days due to the huge size of this dataset,
// and to prevent permanent "shadow-banning" in the event of accidental unfollows. // and to prevent permanent "shadow-banning" in the event of accidental unfollows.
// we treat unfollows as less critical than above 4 negative signals, since it deals more with // we treat unfollows as less critical than above 4 negative signals, since it deals more with
// interest than health typically, which might change over time. // interest than health typically, which might change over time.
@ -83,7 +89,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
readSnapshot(SocialgraphUnfollowsScalaDataset, sc), readSnapshot(SocialgraphUnfollowsScalaDataset, sc),
FeatureName.NumUnfollows, FeatureName.NumUnfollows,
endTs) endTs)
.filter(_.age < 90) .filter(_.age < accountLevelShadowbanMaxTime)
// group all features by (src, dest) // group all features by (src, dest)
val allEdgeFeatures: SCollection[Edge] = val allEdgeFeatures: SCollection[Edge] =