diff --git a/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala b/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala index 479b67524..2f173df77 100644 --- a/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala +++ b/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala @@ -40,7 +40,8 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO val negativeEdgeReverseOrdering = negativeEdgeOrdering.reverse implicit val pqMonoid: PriorityQueueMonoid[Edge] = new PriorityQueueMonoid[Edge](maxDestinationIds)(negativeEdgeOrdering) - + val accountLevelShadowbanMaxTime = 1 //in days + override protected def configurePipeline( sc: ScioContext, opts: InteractionGraphNegativeOption @@ -49,17 +50,22 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO val endTs = opts.interval.getEndMillis // read input datasets + + // we only count blocks in the past X days to prevent permanent "shadow-banning". val blocks: SCollection[InteractionGraphRawInput] = GraphUtil.getFlockFeatures( readSnapshot(FlockBlocksEdgesScalaDataset, sc), FeatureName.NumBlocks, endTs) + .filter(_.age < accountLevelShadowbanMaxTime) + // we only count mutes in the past X days to prevent permanent "shadow-banning". val mutes: SCollection[InteractionGraphRawInput] = GraphUtil.getFlockFeatures( readSnapshot(FlockMutesEdgesScalaDataset, sc), FeatureName.NumMutes, endTs) + .filter(_.age < accountLevelShadowbanMaxTime) val abuseReports: SCollection[InteractionGraphRawInput] = GraphUtil.getFlockFeatures( @@ -73,7 +79,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO FeatureName.NumReportAsSpams, endTs) - // we only keep unfollows in the past 90 days due to the huge size of this dataset, + // we only keep unfollows in the past X days due to the huge size of this dataset, // and to prevent permanent "shadow-banning" in the event of accidental unfollows. // we treat unfollows as less critical than above 4 negative signals, since it deals more with // interest than health typically, which might change over time. @@ -83,7 +89,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO readSnapshot(SocialgraphUnfollowsScalaDataset, sc), FeatureName.NumUnfollows, endTs) - .filter(_.age < 90) + .filter(_.age < accountLevelShadowbanMaxTime) // group all features by (src, dest) val allEdgeFeatures: SCollection[Edge] =