Merge 918fa11ab2
into 72eda9a24f
This commit is contained in:
commit
5bc1705210
|
@ -40,6 +40,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
|
||||||
val negativeEdgeReverseOrdering = negativeEdgeOrdering.reverse
|
val negativeEdgeReverseOrdering = negativeEdgeOrdering.reverse
|
||||||
implicit val pqMonoid: PriorityQueueMonoid[Edge] =
|
implicit val pqMonoid: PriorityQueueMonoid[Edge] =
|
||||||
new PriorityQueueMonoid[Edge](maxDestinationIds)(negativeEdgeOrdering)
|
new PriorityQueueMonoid[Edge](maxDestinationIds)(negativeEdgeOrdering)
|
||||||
|
val accountLevelShadowbanMaxTime = 1 //in days
|
||||||
|
|
||||||
override protected def configurePipeline(
|
override protected def configurePipeline(
|
||||||
sc: ScioContext,
|
sc: ScioContext,
|
||||||
|
@ -49,17 +50,22 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
|
||||||
val endTs = opts.interval.getEndMillis
|
val endTs = opts.interval.getEndMillis
|
||||||
|
|
||||||
// read input datasets
|
// read input datasets
|
||||||
|
|
||||||
|
// we only count blocks in the past X days to prevent permanent "shadow-banning".
|
||||||
val blocks: SCollection[InteractionGraphRawInput] =
|
val blocks: SCollection[InteractionGraphRawInput] =
|
||||||
GraphUtil.getFlockFeatures(
|
GraphUtil.getFlockFeatures(
|
||||||
readSnapshot(FlockBlocksEdgesScalaDataset, sc),
|
readSnapshot(FlockBlocksEdgesScalaDataset, sc),
|
||||||
FeatureName.NumBlocks,
|
FeatureName.NumBlocks,
|
||||||
endTs)
|
endTs)
|
||||||
|
.filter(_.age < accountLevelShadowbanMaxTime)
|
||||||
|
|
||||||
|
// we only count mutes in the past X days to prevent permanent "shadow-banning".
|
||||||
val mutes: SCollection[InteractionGraphRawInput] =
|
val mutes: SCollection[InteractionGraphRawInput] =
|
||||||
GraphUtil.getFlockFeatures(
|
GraphUtil.getFlockFeatures(
|
||||||
readSnapshot(FlockMutesEdgesScalaDataset, sc),
|
readSnapshot(FlockMutesEdgesScalaDataset, sc),
|
||||||
FeatureName.NumMutes,
|
FeatureName.NumMutes,
|
||||||
endTs)
|
endTs)
|
||||||
|
.filter(_.age < accountLevelShadowbanMaxTime)
|
||||||
|
|
||||||
val abuseReports: SCollection[InteractionGraphRawInput] =
|
val abuseReports: SCollection[InteractionGraphRawInput] =
|
||||||
GraphUtil.getFlockFeatures(
|
GraphUtil.getFlockFeatures(
|
||||||
|
@ -72,8 +78,9 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
|
||||||
readSnapshot(FlockReportAsSpamEdgesScalaDataset, sc),
|
readSnapshot(FlockReportAsSpamEdgesScalaDataset, sc),
|
||||||
FeatureName.NumReportAsSpams,
|
FeatureName.NumReportAsSpams,
|
||||||
endTs)
|
endTs)
|
||||||
|
.filter(_.age < accountLevelShadowbanMaxTime)
|
||||||
|
|
||||||
// we only keep unfollows in the past 90 days due to the huge size of this dataset,
|
// we only keep unfollows in the past X days due to the huge size of this dataset,
|
||||||
// and to prevent permanent "shadow-banning" in the event of accidental unfollows.
|
// and to prevent permanent "shadow-banning" in the event of accidental unfollows.
|
||||||
// we treat unfollows as less critical than above 4 negative signals, since it deals more with
|
// we treat unfollows as less critical than above 4 negative signals, since it deals more with
|
||||||
// interest than health typically, which might change over time.
|
// interest than health typically, which might change over time.
|
||||||
|
@ -83,7 +90,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
|
||||||
readSnapshot(SocialgraphUnfollowsScalaDataset, sc),
|
readSnapshot(SocialgraphUnfollowsScalaDataset, sc),
|
||||||
FeatureName.NumUnfollows,
|
FeatureName.NumUnfollows,
|
||||||
endTs)
|
endTs)
|
||||||
.filter(_.age < 90)
|
.filter(_.age < accountLevelShadowbanMaxTime)
|
||||||
|
|
||||||
// group all features by (src, dest)
|
// group all features by (src, dest)
|
||||||
val allEdgeFeatures: SCollection[Edge] =
|
val allEdgeFeatures: SCollection[Edge] =
|
||||||
|
|
Loading…
Reference in New Issue