From f1b5c327340f386170d094b98ed1da65e260fb28 Mon Sep 17 00:00:00 2001 From: twitter-team <> Date: Mon, 10 Apr 2023 11:50:24 -0700 Subject: [PATCH] Open-sourcing User Signal Service User Signal Service (USS) is a centralized online platform that supplies comprehensive data on user actions and behaviors on Twitter. This service stores information on both explicit signals, such as Favorites, Retweets, and replies, and implicit signals like Tweet clicks, profile visits, and more. --- user-signal-service/README.md | 5 + user-signal-service/server/BUILD | 21 ++ .../server/src/main/resources/BUILD | 7 + .../src/main/resources/config/decider.yml | 6 + .../server/src/main/resources/logback.xml | 155 +++++++++++ .../scala/com/twitter/usersignalservice/BUILD | 9 + ...UserSignalServiceStratoFedServerMain.scala | 32 +++ .../base/AggregatedSignalController.scala | 58 ++++ .../com/twitter/usersignalservice/base/BUILD | 16 ++ .../base/BaseSignalFetcher.scala | 90 +++++++ .../FilteredSignalFetcherController.scala | 75 ++++++ .../base/ManhattanSignalFetcher.scala | 66 +++++ .../base/MemcachedSignalFetcherWrapper.scala | 70 +++++ .../base/StratoSignalFetcher.scala | 61 +++++ .../twitter/usersignalservice/columns/BUILD | 11 + .../columns/UserSignalServiceColumn.scala | 49 ++++ .../twitter/usersignalservice/config/BUILD | 9 + .../config/SignalFetcherConfig.scala | 253 ++++++++++++++++++ .../twitter/usersignalservice/handler/BUILD | 14 + .../handler/UserSignalHandler.scala | 71 +++++ .../twitter/usersignalservice/module/BUILD | 25 ++ .../module/CacheModule.scala | 34 +++ .../module/MHMtlsParamsModule.scala | 18 ++ .../SocialGraphServiceClientModule.scala | 40 +++ .../module/TimerModule.scala | 12 + .../twitter/usersignalservice/service/BUILD | 13 + .../service/UserSignalService.scala | 26 ++ .../signals/AccountBlocksFetcher.scala | 40 +++ .../signals/AccountFollowsFetcher.scala | 44 +++ .../signals/AccountMutesFetcher.scala | 40 +++ .../twitter/usersignalservice/signals/BUILD | 34 +++ .../signals/NegativeEngagedTweetFetcher.scala | 97 +++++++ .../signals/NegativeEngagedUserFetcher.scala | 79 ++++++ .../NotificationOpenAndClickFetcher.scala | 145 ++++++++++ .../signals/OriginalTweetsFetcher.scala | 70 +++++ .../signals/ProfileClickFetcher.scala | 98 +++++++ .../signals/ProfileVisitsFetcher.scala | 143 ++++++++++ .../signals/RealGraphOonFetcher.scala | 70 +++++ .../signals/ReplyTweetsFetcher.scala | 70 +++++ .../signals/RetweetsFetcher.scala | 74 +++++ .../signals/SignalFilter.scala | 48 ++++ .../signals/TweetClickFetcher.scala | 94 +++++++ .../signals/TweetFavoritesFetcher.scala | 86 ++++++ .../signals/TweetSharesFetcher.scala | 77 ++++++ .../VideoTweetsPlayback50Fetcher.scala | 72 +++++ .../VideoTweetsQualityViewFetcher.scala | 72 +++++ .../usersignalservice/signals/common/BUILD | 15 ++ .../signals/common/SGSUtils.scala | 59 ++++ .../thrift/src/main/thrift/BUILD | 20 ++ .../src/main/thrift/client_identifier.thrift | 22 ++ .../thrift/src/main/thrift/service.thrift | 23 ++ .../thrift/src/main/thrift/signal.thrift | 113 ++++++++ 52 files changed, 2951 insertions(+) create mode 100644 user-signal-service/README.md create mode 100644 user-signal-service/server/BUILD create mode 100644 user-signal-service/server/src/main/resources/BUILD create mode 100644 user-signal-service/server/src/main/resources/config/decider.yml create mode 100644 user-signal-service/server/src/main/resources/logback.xml create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD create mode 100644 user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala create mode 100644 user-signal-service/thrift/src/main/thrift/BUILD create mode 100644 user-signal-service/thrift/src/main/thrift/client_identifier.thrift create mode 100644 user-signal-service/thrift/src/main/thrift/service.thrift create mode 100644 user-signal-service/thrift/src/main/thrift/signal.thrift diff --git a/user-signal-service/README.md b/user-signal-service/README.md new file mode 100644 index 000000000..d30568cf4 --- /dev/null +++ b/user-signal-service/README.md @@ -0,0 +1,5 @@ +# User Signal Service # + +**User Signal Service** (USS) is a centralized online platform that supplies comprehensive data on user actions and behaviors on Twitter. This information encompasses both explicit signals, such as favoriting, retweeting, and replying, as well as implicit signals, including tweet clicks, video views, profile visits, and more. + +To ensure consistency and accuracy, USS gathers these signals from various underlying datasets and online services, processing them into uniform formats. These standardized source signals are then utilized in candidate retrieval and machine learning features for ranking stages. \ No newline at end of file diff --git a/user-signal-service/server/BUILD b/user-signal-service/server/BUILD new file mode 100644 index 000000000..76ff96764 --- /dev/null +++ b/user-signal-service/server/BUILD @@ -0,0 +1,21 @@ +jvm_binary( + name = "bin", + basename = "user-signal-service", + main = "com.twitter.usersignalservice.UserSignalServiceStratoFedServerMain", + runtime_platform = "java11", + tags = ["bazel-compatible"], + dependencies = [ + "3rdparty/jvm/ch/qos/logback:logback-classic", + "loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback", + "strato/src/main/scala/com/twitter/strato/logging/logback", + "user-signal-service/server/src/main/resources", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice", + ], +) + +# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app +jvm_app( + name = "user-signal-service-app", + archive = "zip", + binary = ":bin", +) diff --git a/user-signal-service/server/src/main/resources/BUILD b/user-signal-service/server/src/main/resources/BUILD new file mode 100644 index 000000000..b35d9c9d4 --- /dev/null +++ b/user-signal-service/server/src/main/resources/BUILD @@ -0,0 +1,7 @@ +resources( + sources = [ + "*.xml", + "*.yml", + "config/*.yml", + ], +) diff --git a/user-signal-service/server/src/main/resources/config/decider.yml b/user-signal-service/server/src/main/resources/config/decider.yml new file mode 100644 index 000000000..f22a9dc22 --- /dev/null +++ b/user-signal-service/server/src/main/resources/config/decider.yml @@ -0,0 +1,6 @@ +test_value: + comment: Test Value + default_availability: 10000 +dark_traffic_percent: + comment: Percentage of traffic to send to dark traffic destination + default_availability: 0 \ No newline at end of file diff --git a/user-signal-service/server/src/main/resources/logback.xml b/user-signal-service/server/src/main/resources/logback.xml new file mode 100644 index 000000000..6511278df --- /dev/null +++ b/user-signal-service/server/src/main/resources/logback.xml @@ -0,0 +1,155 @@ + + + + + + + + + + + + + + + + + + + + + + true + + + + + ${log.service.output} + + ${log.service.output}.%i + 1 + 10 + + + 50MB + + + %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n + + + + + + ${log.strato_only.output} + + ${log.strato_only.output}.%i + 1 + 10 + + + 50MB + + + %date %.-3level ${DEFAULT_SERVICE_PATTERN}%n + + + + + + true + loglens + ${log.lens.index} + ${log.lens.tag}/service + + %msg%n + + + 500 + 50 + + + manhattan-client + .*InvalidRequest.* + + + + + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + ${async_queue_size} + ${async_max_flush_time} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD new file mode 100644 index 000000000..248fff64b --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/BUILD @@ -0,0 +1,9 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala new file mode 100644 index 000000000..878310abb --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/UserSignalServiceStratoFedServerMain.scala @@ -0,0 +1,32 @@ +package com.twitter.usersignalservice + +import com.google.inject.Module +import com.twitter.inject.thrift.modules.ThriftClientIdModule +import com.twitter.usersignalservice.columns.UserSignalServiceColumn +import com.twitter.strato.fed._ +import com.twitter.strato.fed.server._ +import com.twitter.usersignalservice.module.CacheModule +import com.twitter.usersignalservice.module.MHMtlsParamsModule +import com.twitter.usersignalservice.module.SocialGraphServiceClientModule +import com.twitter.usersignalservice.module.TimerModule + +object UserSignalServiceStratoFedServerMain extends UserSignalServiceStratoFedServer + +trait UserSignalServiceStratoFedServer extends StratoFedServer { + override def dest: String = "/s/user-signal-service/user-signal-service" + + override def columns: Seq[Class[_ <: StratoFed.Column]] = + Seq( + classOf[UserSignalServiceColumn] + ) + + override def modules: Seq[Module] = + Seq( + CacheModule, + MHMtlsParamsModule, + SocialGraphServiceClientModule, + ThriftClientIdModule, + TimerModule, + ) + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala new file mode 100644 index 000000000..fb698b01a --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/AggregatedSignalController.scala @@ -0,0 +1,58 @@ +package com.twitter.usersignalservice.base + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.base.Stats +import com.twitter.storehaus.ReadableStore +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.BaseSignalFetcher.Timeout +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer + +case class AggregatedSignalController( + signalsAggregationInfo: Seq[SignalAggregatedInfo], + signalsWeightMapInfo: Map[SignalType, Double], + stats: StatsReceiver, + timer: Timer) + extends ReadableStore[Query, Seq[Signal]] { + + val name: String = this.getClass.getCanonicalName + val statsReceiver: StatsReceiver = stats.scope(name) + + override def get(query: Query): Future[Option[Seq[Signal]]] = { + Stats + .trackItems(statsReceiver) { + val allSignalsFut = + Future + .collect(signalsAggregationInfo.map(_.getSignals(query.userId))).map(_.flatten.flatten) + val aggregatedSignals = + allSignalsFut.map { allSignals => + allSignals + .groupBy(_.targetInternalId).collect { + case (Some(internalId), signals) => + val mostRecentEnagementTime = signals.map(_.timestamp).max + val totalWeight = + signals + .map(signal => signalsWeightMapInfo.getOrElse(signal.signalType, 0.0)).sum + (Signal(query.signalType, mostRecentEnagementTime, Some(internalId)), totalWeight) + }.toSeq.sortBy { case (signal, weight) => (-weight, -signal.timestamp) } + .map(_._1) + .take(query.maxResults.getOrElse(Int.MaxValue)) + } + aggregatedSignals.map(Some(_)) + }.raiseWithin(Timeout)(timer).handle { + case e => + statsReceiver.counter(e.getClass.getCanonicalName).incr() + Some(Seq.empty[Signal]) + } + } +} + +case class SignalAggregatedInfo( + signalType: SignalType, + signalFetcher: ReadableStore[Query, Seq[Signal]]) { + def getSignals(userId: UserId): Future[Option[Seq[Signal]]] = { + signalFetcher.get(Query(userId, signalType, None)) + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD new file mode 100644 index 000000000..83bb0aa3e --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BUILD @@ -0,0 +1,16 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "3rdparty/src/jvm/com/twitter/storehaus:core", + "finagle/finagle-stats", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common", + "relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection", + "src/scala/com/twitter/storehaus_internal/manhattan", + "src/scala/com/twitter/twistly/common", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala new file mode 100644 index 000000000..27646b9cc --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/BaseSignalFetcher.scala @@ -0,0 +1,90 @@ +package com.twitter.usersignalservice +package base + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.storehaus.ReadableStore +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Future +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.frigate.common.base.Stats +import com.twitter.conversions.DurationOps._ +import com.twitter.usersignalservice.thriftscala.ClientIdentifier +import com.twitter.util.Duration +import com.twitter.util.Timer +import java.io.Serializable + +case class Query( + userId: UserId, + signalType: SignalType, + maxResults: Option[Int], + clientId: ClientIdentifier = ClientIdentifier.Unknown) + +/** + * A trait that defines a standard interface for the signal fetcher + * + * Extends this only when all other traits extending BaseSignalFetcher do not apply to + * your use case. + */ +trait BaseSignalFetcher extends ReadableStore[Query, Seq[Signal]] { + import BaseSignalFetcher._ + + /** + * This RawSignalType is the output type of `getRawSignals` and the input type of `process`. + * Override it as your own raw signal type to maintain meta data which can be used in the + * step of `process`. + * Note that the RawSignalType is an intermediate data type intended to be small to avoid + * big data chunks being passed over functions or being memcached. + */ + type RawSignalType <: Serializable + + def name: String + def statsReceiver: StatsReceiver + def timer: Timer + + /** + * This function is called by the top level class to fetch signals. It executes the pipeline to + * fetch raw signals, process and transform the signals. Exceptions and timeout control are + * handled here. + * @param query + * @return Future[Option[Seq[Signal]]] + */ + override def get(query: Query): Future[Option[Seq[Signal]]] = { + val clientStatsReceiver = statsReceiver.scope(query.clientId.name).scope(query.signalType.name) + Stats + .trackItems(clientStatsReceiver) { + val rawSignals = getRawSignals(query.userId) + val signals = process(query, rawSignals) + signals + }.raiseWithin(Timeout)(timer).handle { + case e => + clientStatsReceiver.scope("FetcherExceptions").counter(e.getClass.getCanonicalName).incr() + EmptyResponse + } + } + + /** + * Override this function to define how to fetch the raw signals from any store + * Note that the RawSignalType is an intermediate data type intended to be small to avoid + * big data chunks being passed over functions or being memcached. + * @param userId + * @return Future[Option[Seq[RawSignalType]]] + */ + def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] + + /** + * Override this function to define how to process the raw signals and transform them to signals. + * @param query + * @param rawSignals + * @return Future[Option[Seq[Signal]]] + */ + def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] +} + +object BaseSignalFetcher { + val Timeout: Duration = 20.milliseconds + val EmptyResponse: Option[Seq[Signal]] = Some(Seq.empty[Signal]) +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala new file mode 100644 index 000000000..e2e0e96fe --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/FilteredSignalFetcherController.scala @@ -0,0 +1,75 @@ +package com.twitter.usersignalservice.base + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.base.Stats +import com.twitter.storehaus.ReadableStore +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer + +/** + * Combine a BaseSignalFetcher with a map of negative signalFetchers. Filter out the negative + * signals from the signals from BaseSignalFetcher. + */ +case class FilteredSignalFetcherController( + backingSignalFetcher: BaseSignalFetcher, + originSignalType: SignalType, + stats: StatsReceiver, + timer: Timer, + filterSignalFetchers: Map[SignalType, BaseSignalFetcher] = + Map.empty[SignalType, BaseSignalFetcher]) + extends ReadableStore[Query, Seq[Signal]] { + val statsReceiver: StatsReceiver = stats.scope(this.getClass.getCanonicalName) + + override def get(query: Query): Future[Option[Seq[Signal]]] = { + val clientStatsReceiver = statsReceiver.scope(query.signalType.name).scope(query.clientId.name) + Stats + .trackItems(clientStatsReceiver) { + val backingSignals = + backingSignalFetcher.get(Query(query.userId, originSignalType, None, query.clientId)) + val filteredSignals = filter(query, backingSignals) + filteredSignals + }.raiseWithin(BaseSignalFetcher.Timeout)(timer).handle { + case e => + clientStatsReceiver.scope("FetcherExceptions").counter(e.getClass.getCanonicalName).incr() + BaseSignalFetcher.EmptyResponse + } + } + + def filter( + query: Query, + rawSignals: Future[Option[Seq[Signal]]] + ): Future[Option[Seq[Signal]]] = { + Stats + .trackItems(statsReceiver) { + val originSignals = rawSignals.map(_.getOrElse(Seq.empty[Signal])) + val filterSignals = + Future + .collect { + filterSignalFetchers.map { + case (signalType, signalFetcher) => + signalFetcher + .get(Query(query.userId, signalType, None, query.clientId)) + .map(_.getOrElse(Seq.empty)) + }.toSeq + }.map(_.flatten.toSet) + val filterSignalsSet = filterSignals + .map(_.flatMap(_.targetInternalId)) + + val originSignalsWithId = + originSignals.map(_.map(signal => (signal, signal.targetInternalId))) + Future.join(originSignalsWithId, filterSignalsSet).map { + case (originSignalsWithId, filterSignalsSet) => + Some( + originSignalsWithId + .collect { + case (signal, internalIdOpt) + if internalIdOpt.nonEmpty && !filterSignalsSet.contains(internalIdOpt.get) => + signal + }.take(query.maxResults.getOrElse(Int.MaxValue))) + } + } + } + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala new file mode 100644 index 000000000..d0918a165 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/ManhattanSignalFetcher.scala @@ -0,0 +1,66 @@ +package com.twitter.usersignalservice +package base + +import com.twitter.bijection.Codec +import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams +import com.twitter.storehaus.ReadableStore +import com.twitter.storehaus_internal.manhattan.ManhattanCluster +import com.twitter.storehaus_internal.manhattan.ManhattanRO +import com.twitter.storehaus_internal.manhattan.ManhattanROConfig +import com.twitter.storehaus_internal.util.HDFSPath +import com.twitter.twistly.common.UserId +import com.twitter.util.Future +import com.twitter.storehaus_internal.util.ApplicationID +import com.twitter.storehaus_internal.util.DatasetName + +/** + * A Manhattan signal fetcher extending BaseSignalFetcher to provide an interface to fetch signals + * from a Manhattan dataset. + * + * Extends this when the underlying store is a single Manhattan dataset. + * @tparam ManhattanKeyType + * @tparam ManhattanValueType + */ +trait ManhattanSignalFetcher[ManhattanKeyType, ManhattanValueType] extends BaseSignalFetcher { + /* + Define the meta info of the Manhattan dataset + */ + protected def manhattanAppId: String + protected def manhattanDatasetName: String + protected def manhattanClusterId: ManhattanCluster + protected def manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams + + protected def manhattanKeyCodec: Codec[ManhattanKeyType] + protected def manhattanRawSignalCodec: Codec[ManhattanValueType] + + /** + * Adaptor to transform the userId to the ManhattanKey + * @param userId + * @return ManhattanKeyType + */ + protected def toManhattanKey(userId: UserId): ManhattanKeyType + + /** + * Adaptor to transform the ManhattanValue to the Seq of RawSignalType + * @param manhattanValue + * @return Seq[RawSignalType] + */ + protected def toRawSignals(manhattanValue: ManhattanValueType): Seq[RawSignalType] + + protected final lazy val underlyingStore: ReadableStore[UserId, Seq[RawSignalType]] = { + ManhattanRO + .getReadableStoreWithMtls[ManhattanKeyType, ManhattanValueType]( + ManhattanROConfig( + HDFSPath(""), + ApplicationID(manhattanAppId), + DatasetName(manhattanDatasetName), + manhattanClusterId), + manhattanKVClientMtlsParams + )(manhattanKeyCodec, manhattanRawSignalCodec) + .composeKeyMapping(userId => toManhattanKey(userId)) + .mapValues(manhattanRawSignal => toRawSignals(manhattanRawSignal)) + } + + override final def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] = + underlyingStore.get(userId) +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala new file mode 100644 index 000000000..4022d9021 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/MemcachedSignalFetcherWrapper.scala @@ -0,0 +1,70 @@ +package com.twitter.usersignalservice +package base + +import com.twitter.finagle.memcached.{Client => MemcachedClient} +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.hashing.KeyHasher +import com.twitter.hermit.store.common.ObservedMemcachedReadableStore +import com.twitter.relevance_platform.common.injection.LZ4Injection +import com.twitter.relevance_platform.common.injection.SeqObjectInjection +import com.twitter.storehaus.ReadableStore +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.Timer + +/** + * Use this wrapper when the latency of the signal fetcher is too high (see BaseSignalFetcher.Timeout + * ) and the results from the signal fetcher don't change often (e.g. results are generated from a + * scalding job scheduled each day). + * @param memcachedClient + * @param baseSignalFetcher + * @param ttl + * @param stats + * @param timer + */ +case class MemcachedSignalFetcherWrapper( + memcachedClient: MemcachedClient, + baseSignalFetcher: BaseSignalFetcher, + ttl: Duration, + stats: StatsReceiver, + keyPrefix: String, + timer: Timer) + extends BaseSignalFetcher { + import MemcachedSignalFetcherWrapper._ + override type RawSignalType = baseSignalFetcher.RawSignalType + + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name).scope(baseSignalFetcher.name) + + val underlyingStore: ReadableStore[UserId, Seq[RawSignalType]] = { + val cacheUnderlyingStore = new ReadableStore[UserId, Seq[RawSignalType]] { + override def get(userId: UserId): Future[Option[Seq[RawSignalType]]] = + baseSignalFetcher.getRawSignals(userId) + } + ObservedMemcachedReadableStore.fromCacheClient( + backingStore = cacheUnderlyingStore, + cacheClient = memcachedClient, + ttl = ttl)( + valueInjection = LZ4Injection.compose(SeqObjectInjection[RawSignalType]()), + statsReceiver = statsReceiver, + keyToString = { k: UserId => + s"$keyPrefix:${keyHasher.hashKey(k.toString.getBytes)}" + } + ) + } + + override def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] = + underlyingStore.get(userId) + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = baseSignalFetcher.process(query, rawSignals) + +} + +object MemcachedSignalFetcherWrapper { + private val keyHasher: KeyHasher = KeyHasher.FNV1A_64 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala new file mode 100644 index 000000000..2d0de84b6 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base/StratoSignalFetcher.scala @@ -0,0 +1,61 @@ +package com.twitter.usersignalservice +package base +import com.twitter.frigate.common.store.strato.StratoFetchableStore +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.twistly.common.UserId +import com.twitter.util.Future + +/** + * A Strato signal fetcher extending BaseSignalFetcher to provide an interface to fetch signals from + * Strato Column. + * + * Extends this when the underlying store is a single Strato column. + * @tparam StratoKeyType + * @tparam StratoViewType + * @tparam StratoValueType + */ +trait StratoSignalFetcher[StratoKeyType, StratoViewType, StratoValueType] + extends BaseSignalFetcher { + /* + Define the meta info of the strato column + */ + def stratoClient: Client + def stratoColumnPath: String + def stratoView: StratoViewType + + /** + * Override these vals and remove the implicit key words. + * @return + */ + protected implicit def keyConv: Conv[StratoKeyType] + protected implicit def viewConv: Conv[StratoViewType] + protected implicit def valueConv: Conv[StratoValueType] + + /** + * Adapter to transform the userId to the StratoKeyType + * @param userId + * @return StratoKeyType + */ + protected def toStratoKey(userId: UserId): StratoKeyType + + /** + * Adapter to transform the StratoValueType to a Seq of RawSignalType + * @param stratoValue + * @return Seq[RawSignalType] + */ + protected def toRawSignals(stratoValue: StratoValueType): Seq[RawSignalType] + + protected final lazy val underlyingStore: ReadableStore[UserId, Seq[RawSignalType]] = + StratoFetchableStore + .withView[StratoKeyType, StratoViewType, StratoValueType]( + stratoClient, + stratoColumnPath, + stratoView) + .composeKeyMapping(toStratoKey) + .mapValues(toRawSignals) + + override final def getRawSignals(userId: UserId): Future[Option[Seq[RawSignalType]]] = + underlyingStore.get(userId) +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD new file mode 100644 index 000000000..1cb85f732 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/BUILD @@ -0,0 +1,11 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "src/scala/com/twitter/twistly/common", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala new file mode 100644 index 000000000..aea92ecd1 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns/UserSignalServiceColumn.scala @@ -0,0 +1,49 @@ +package com.twitter.usersignalservice.columns + +import com.twitter.stitch.NotFound +import com.twitter.stitch.Stitch +import com.twitter.strato.catalog.OpMetadata +import com.twitter.strato.catalog.Ops +import com.twitter.strato.config.Policy +import com.twitter.strato.config.ReadWritePolicy +import com.twitter.strato.data.Conv +import com.twitter.strato.data.Description +import com.twitter.strato.data.Lifecycle +import com.twitter.strato.fed.StratoFed +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.usersignalservice.service.UserSignalService +import com.twitter.usersignalservice.thriftscala.BatchSignalRequest +import com.twitter.usersignalservice.thriftscala.BatchSignalResponse +import javax.inject.Inject + +class UserSignalServiceColumn @Inject() (userSignalService: UserSignalService) + extends StratoFed.Column(UserSignalServiceColumn.Path) + with StratoFed.Fetch.Stitch { + + override val metadata: OpMetadata = OpMetadata( + lifecycle = Some(Lifecycle.Production), + description = Some(Description.PlainText("User Signal Service Federated Column"))) + + override def ops: Ops = super.ops + + override type Key = BatchSignalRequest + override type View = Unit + override type Value = BatchSignalResponse + + override val keyConv: Conv[Key] = ScroogeConv.fromStruct[BatchSignalRequest] + override val viewConv: Conv[View] = Conv.ofType + override val valueConv: Conv[Value] = ScroogeConv.fromStruct[BatchSignalResponse] + + override def fetch(key: Key, view: View): Stitch[Result[Value]] = { + userSignalService + .userSignalServiceHandlerStoreStitch(key) + .map(result => found(result)) + .handle { + case NotFound => missing + } + } +} + +object UserSignalServiceColumn { + val Path = "recommendations/user-signal-service/signals" +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD new file mode 100644 index 000000000..cca1bf2e0 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/BUILD @@ -0,0 +1,9 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala new file mode 100644 index 000000000..f7238edcc --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config/SignalFetcherConfig.scala @@ -0,0 +1,253 @@ +package com.twitter.usersignalservice.config + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.memcached.{Client => MemcachedClient} +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.storehaus.ReadableStore +import com.twitter.usersignalservice.base.BaseSignalFetcher +import com.twitter.usersignalservice.base.AggregatedSignalController +import com.twitter.usersignalservice.base.FilteredSignalFetcherController +import com.twitter.usersignalservice.base.MemcachedSignalFetcherWrapper +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.SignalAggregatedInfo +import com.twitter.usersignalservice.signals.AccountBlocksFetcher +import com.twitter.usersignalservice.signals.AccountFollowsFetcher +import com.twitter.usersignalservice.signals.AccountMutesFetcher +import com.twitter.usersignalservice.signals.NotificationOpenAndClickFetcher +import com.twitter.usersignalservice.signals.OriginalTweetsFetcher +import com.twitter.usersignalservice.signals.ProfileVisitsFetcher +import com.twitter.usersignalservice.signals.ProfileClickFetcher +import com.twitter.usersignalservice.signals.RealGraphOonFetcher +import com.twitter.usersignalservice.signals.ReplyTweetsFetcher +import com.twitter.usersignalservice.signals.RetweetsFetcher +import com.twitter.usersignalservice.signals.TweetClickFetcher +import com.twitter.usersignalservice.signals.TweetFavoritesFetcher +import com.twitter.usersignalservice.signals.TweetSharesFetcher +import com.twitter.usersignalservice.signals.VideoTweetsPlayback50Fetcher +import com.twitter.usersignalservice.signals.VideoTweetsQualityViewFetcher +import com.twitter.usersignalservice.signals.NegativeEngagedUserFetcher +import com.twitter.usersignalservice.signals.NegativeEngagedTweetFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class SignalFetcherConfig @Inject() ( + notificationOpenAndClickFetcher: NotificationOpenAndClickFetcher, + accountFollowsFetcher: AccountFollowsFetcher, + profileVisitsFetcher: ProfileVisitsFetcher, + tweetFavoritesFetcher: TweetFavoritesFetcher, + retweetsFetcher: RetweetsFetcher, + replyTweetsFetcher: ReplyTweetsFetcher, + originalTweetsFetcher: OriginalTweetsFetcher, + tweetSharesFetcher: TweetSharesFetcher, + memcachedClient: MemcachedClient, + realGraphOonFetcher: RealGraphOonFetcher, + tweetClickFetcher: TweetClickFetcher, + videoTweetsPlayback50Fetcher: VideoTweetsPlayback50Fetcher, + videoTweetsQualityViewFetcher: VideoTweetsQualityViewFetcher, + accountMutesFetcher: AccountMutesFetcher, + accountBlocksFetcher: AccountBlocksFetcher, + profileClickFetcher: ProfileClickFetcher, + negativeEngagedTweetFetcher: NegativeEngagedTweetFetcher, + negativeEngagedUserFetcher: NegativeEngagedUserFetcher, + statsReceiver: StatsReceiver, + timer: Timer) { + + val MemcachedProfileVisitsFetcher: BaseSignalFetcher = + MemcachedSignalFetcherWrapper( + memcachedClient, + profileVisitsFetcher, + ttl = 8.hours, + statsReceiver, + keyPrefix = "uss:pv", + timer) + + val MemcachedAccountFollowsFetcher: BaseSignalFetcher = MemcachedSignalFetcherWrapper( + memcachedClient, + accountFollowsFetcher, + ttl = 5.minute, + statsReceiver, + keyPrefix = "uss:af", + timer) + + val GoodTweetClickDdgFetcher: SignalType => FilteredSignalFetcherController = signalType => + FilteredSignalFetcherController( + tweetClickFetcher, + signalType, + statsReceiver, + timer, + Map(SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher) + ) + + val GoodProfileClickDdgFetcher: SignalType => FilteredSignalFetcherController = signalType => + FilteredSignalFetcherController( + profileClickFetcher, + signalType, + statsReceiver, + timer, + Map(SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher) + ) + + val GoodProfileClickDdgFetcherWithBlocksMutes: SignalType => FilteredSignalFetcherController = + signalType => + FilteredSignalFetcherController( + profileClickFetcher, + signalType, + statsReceiver, + timer, + Map( + SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher, + SignalType.AccountMute -> accountMutesFetcher, + SignalType.AccountBlock -> accountBlocksFetcher + ) + ) + + val realGraphOonFilteredFetcher: FilteredSignalFetcherController = + FilteredSignalFetcherController( + realGraphOonFetcher, + SignalType.RealGraphOon, + statsReceiver, + timer, + Map( + SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher + ) + ) + + val videoTweetsQualityViewFilteredFetcher: FilteredSignalFetcherController = + FilteredSignalFetcherController( + videoTweetsQualityViewFetcher, + SignalType.VideoView90dQualityV1, + statsReceiver, + timer, + Map(SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher) + ) + + val videoTweetsPlayback50FilteredFetcher: FilteredSignalFetcherController = + FilteredSignalFetcherController( + videoTweetsPlayback50Fetcher, + SignalType.VideoView90dPlayback50V1, + statsReceiver, + timer, + Map(SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher) + ) + + val uniformTweetSignalInfo: Seq[SignalAggregatedInfo] = Seq( + SignalAggregatedInfo(SignalType.TweetFavorite, tweetFavoritesFetcher), + SignalAggregatedInfo(SignalType.Retweet, retweetsFetcher), + SignalAggregatedInfo(SignalType.Reply, replyTweetsFetcher), + SignalAggregatedInfo(SignalType.OriginalTweet, originalTweetsFetcher), + SignalAggregatedInfo(SignalType.TweetShareV1, tweetSharesFetcher), + SignalAggregatedInfo(SignalType.VideoView90dQualityV1, videoTweetsQualityViewFilteredFetcher), + ) + + val uniformProducerSignalInfo: Seq[SignalAggregatedInfo] = Seq( + SignalAggregatedInfo(SignalType.AccountFollow, MemcachedAccountFollowsFetcher), + SignalAggregatedInfo( + SignalType.RepeatedProfileVisit90dMinVisit6V1, + MemcachedProfileVisitsFetcher), + ) + + val memcachedAccountBlocksFetcher: MemcachedSignalFetcherWrapper = MemcachedSignalFetcherWrapper( + memcachedClient, + accountBlocksFetcher, + ttl = 5.minutes, + statsReceiver, + keyPrefix = "uss:ab", + timer) + + val memcachedAccountMutesFetcher: MemcachedSignalFetcherWrapper = MemcachedSignalFetcherWrapper( + memcachedClient, + accountMutesFetcher, + ttl = 5.minutes, + statsReceiver, + keyPrefix = "uss:am", + timer) + + val SignalFetcherMapper: Map[SignalType, ReadableStore[Query, Seq[Signal]]] = Map( + /* Raw Signals */ + SignalType.AccountFollow -> accountFollowsFetcher, + SignalType.AccountFollowWithDelay -> MemcachedAccountFollowsFetcher, + SignalType.GoodProfileClick -> GoodProfileClickDdgFetcher(SignalType.GoodProfileClick), + SignalType.GoodProfileClick20s -> GoodProfileClickDdgFetcher(SignalType.GoodProfileClick20s), + SignalType.GoodProfileClick30s -> GoodProfileClickDdgFetcher(SignalType.GoodProfileClick30s), + SignalType.GoodProfileClickFiltered -> GoodProfileClickDdgFetcherWithBlocksMutes( + SignalType.GoodProfileClick), + SignalType.GoodProfileClick20sFiltered -> GoodProfileClickDdgFetcherWithBlocksMutes( + SignalType.GoodProfileClick20s), + SignalType.GoodProfileClick30sFiltered -> GoodProfileClickDdgFetcherWithBlocksMutes( + SignalType.GoodProfileClick30s), + SignalType.GoodTweetClick -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick), + SignalType.GoodTweetClick5s -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick5s), + SignalType.GoodTweetClick10s -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick10s), + SignalType.GoodTweetClick30s -> GoodTweetClickDdgFetcher(SignalType.GoodTweetClick30s), + SignalType.NegativeEngagedTweetId -> negativeEngagedTweetFetcher, + SignalType.NegativeEngagedUserId -> negativeEngagedUserFetcher, + SignalType.NotificationOpenAndClickV1 -> notificationOpenAndClickFetcher, + SignalType.OriginalTweet -> originalTweetsFetcher, + SignalType.OriginalTweet90dV2 -> originalTweetsFetcher, + SignalType.RealGraphOon -> realGraphOonFilteredFetcher, + SignalType.RepeatedProfileVisit14dMinVisit2V1 -> MemcachedProfileVisitsFetcher, + SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative -> FilteredSignalFetcherController( + MemcachedProfileVisitsFetcher, + SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative, + statsReceiver, + timer, + Map( + SignalType.AccountMute -> accountMutesFetcher, + SignalType.AccountBlock -> accountBlocksFetcher) + ), + SignalType.RepeatedProfileVisit90dMinVisit6V1 -> MemcachedProfileVisitsFetcher, + SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative -> FilteredSignalFetcherController( + MemcachedProfileVisitsFetcher, + SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative, + statsReceiver, + timer, + Map( + SignalType.AccountMute -> accountMutesFetcher, + SignalType.AccountBlock -> accountBlocksFetcher), + ), + SignalType.RepeatedProfileVisit180dMinVisit6V1 -> MemcachedProfileVisitsFetcher, + SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative -> FilteredSignalFetcherController( + MemcachedProfileVisitsFetcher, + SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative, + statsReceiver, + timer, + Map( + SignalType.AccountMute -> accountMutesFetcher, + SignalType.AccountBlock -> accountBlocksFetcher), + ), + SignalType.Reply -> replyTweetsFetcher, + SignalType.Reply90dV2 -> replyTweetsFetcher, + SignalType.Retweet -> retweetsFetcher, + SignalType.Retweet90dV2 -> retweetsFetcher, + SignalType.TweetFavorite -> tweetFavoritesFetcher, + SignalType.TweetFavorite90dV2 -> tweetFavoritesFetcher, + SignalType.TweetShareV1 -> tweetSharesFetcher, + SignalType.VideoView90dQualityV1 -> videoTweetsQualityViewFilteredFetcher, + SignalType.VideoView90dPlayback50V1 -> videoTweetsPlayback50FilteredFetcher, + /* Aggregated Signals */ + SignalType.ProducerBasedUnifiedEngagementWeightedSignal -> AggregatedSignalController( + uniformProducerSignalInfo, + uniformProducerSignalEngagementAggregation, + statsReceiver, + timer + ), + SignalType.TweetBasedUnifiedEngagementWeightedSignal -> AggregatedSignalController( + uniformTweetSignalInfo, + uniformTweetSignalEngagementAggregation, + statsReceiver, + timer + ), + SignalType.AdFavorite -> tweetFavoritesFetcher, + /* Negative Signals */ + SignalType.AccountBlock -> memcachedAccountBlocksFetcher, + SignalType.AccountMute -> memcachedAccountMutesFetcher, + SignalType.TweetDontLike -> negativeEngagedTweetFetcher, + SignalType.TweetReport -> negativeEngagedTweetFetcher, + SignalType.TweetSeeFewer -> negativeEngagedTweetFetcher, + ) + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD new file mode 100644 index 000000000..96dbbeeaf --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/BUILD @@ -0,0 +1,14 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "src/scala/com/twitter/twistly/common", + "src/scala/com/twitter/twistly/store", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala new file mode 100644 index 000000000..6fea51c4c --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler/UserSignalHandler.scala @@ -0,0 +1,71 @@ +package com.twitter.usersignalservice.handler + +import com.twitter.storehaus.ReadableStore +import com.twitter.usersignalservice.thriftscala.BatchSignalRequest +import com.twitter.usersignalservice.thriftscala.BatchSignalResponse +import com.twitter.util.Future +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.util.StatsUtil +import com.twitter.usersignalservice.config.SignalFetcherConfig +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.thriftscala.ClientIdentifier +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Duration +import com.twitter.util.Timer +import com.twitter.util.TimeoutException + +class UserSignalHandler( + signalFetcherConfig: SignalFetcherConfig, + timer: Timer, + stats: StatsReceiver) { + import UserSignalHandler._ + + val statsReceiver: StatsReceiver = stats.scope("user-signal-service/service") + + def getBatchSignalsResponse(request: BatchSignalRequest): Future[Option[BatchSignalResponse]] = { + StatsUtil.trackOptionStats(statsReceiver) { + val allSignals = request.signalRequest.map { signalRequest => + signalFetcherConfig + .SignalFetcherMapper(signalRequest.signalType) + .get(Query( + userId = request.userId, + signalType = signalRequest.signalType, + maxResults = signalRequest.maxResults.map(_.toInt), + clientId = request.clientId.getOrElse(ClientIdentifier.Unknown) + )) + .map(signalOpt => (signalRequest.signalType, signalOpt)) + } + + Future.collect(allSignals).map { signalsSeq => + val signalsMap = signalsSeq.map { + case (signalType: SignalType, signalsOpt) => + (signalType, signalsOpt.getOrElse(EmptySeq)) + }.toMap + Some(BatchSignalResponse(signalsMap)) + } + } + } + + def toReadableStore: ReadableStore[BatchSignalRequest, BatchSignalResponse] = { + new ReadableStore[BatchSignalRequest, BatchSignalResponse] { + override def get(request: BatchSignalRequest): Future[Option[BatchSignalResponse]] = { + getBatchSignalsResponse(request).raiseWithin(UserSignalServiceTimeout)(timer).rescue { + case _: TimeoutException => + statsReceiver.counter("endpointGetBatchSignals/failure/timeout").incr() + EmptyResponse + case e => + statsReceiver.counter("endpointGetBatchSignals/failure/" + e.getClass.getName).incr() + EmptyResponse + } + } + } + } +} + +object UserSignalHandler { + val UserSignalServiceTimeout: Duration = 25.milliseconds + + val EmptySeq: Seq[Nothing] = Seq.empty + val EmptyResponse: Future[Option[BatchSignalResponse]] = Future.value(Some(BatchSignalResponse())) +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD new file mode 100644 index 000000000..d8e1e6a49 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/BUILD @@ -0,0 +1,25 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/authentication", + "finagle-internal/mtls/src/main/scala/com/twitter/finagle/mtls/client", + "finagle/finagle-core/src/main", + "finagle/finagle-stats", + "finagle/finagle-thrift/src/main/scala", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", + "relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection", + "servo/service/src/main/scala", + "src/scala/com/twitter/storehaus_internal/manhattan2", + "src/scala/com/twitter/storehaus_internal/memcache", + "src/scala/com/twitter/storehaus_internal/util", + "src/scala/com/twitter/twistly/common", + "src/scala/com/twitter/twistly/store", + "src/thrift/com/twitter/socialgraph:thrift-scala", + "stitch/stitch-storehaus", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + "util/util-core:scala", + "util/util-stats/src/main/scala", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala new file mode 100644 index 000000000..38427b6ce --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/CacheModule.scala @@ -0,0 +1,34 @@ +package com.twitter.usersignalservice.module + +import com.google.inject.Provides +import javax.inject.Singleton +import com.twitter.finagle.memcached.Client +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.inject.TwitterModule +import com.twitter.conversions.DurationOps._ +import com.twitter.storehaus_internal.memcache.MemcacheStore +import com.twitter.storehaus_internal.util.ZkEndPoint +import com.twitter.storehaus_internal.util.ClientName + +object CacheModule extends TwitterModule { + private val cacheDest = + flag[String](name = "cache_module.dest", help = "Path to memcache service") + private val timeout = + flag[Int](name = "memcache.timeout", help = "Memcache client timeout") + + @Singleton + @Provides + def providesCache( + serviceIdentifier: ServiceIdentifier, + stats: StatsReceiver + ): Client = + MemcacheStore.memcachedClient( + name = ClientName("memcache_user_signal_service"), + dest = ZkEndPoint(cacheDest()), + timeout = timeout().milliseconds, + retries = 0, + statsReceiver = stats.scope("memcache"), + serviceIdentifier = serviceIdentifier + ) +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala new file mode 100644 index 000000000..1ff1a7c5d --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/MHMtlsParamsModule.scala @@ -0,0 +1,18 @@ +package com.twitter.usersignalservice.module + +import com.twitter.finagle.mtls.authentication.ServiceIdentifier +import com.twitter.inject.TwitterModule +import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams +import com.google.inject.Provides +import javax.inject.Singleton + +object MHMtlsParamsModule extends TwitterModule { + + @Singleton + @Provides + def providesManhattanMtlsParams( + serviceIdentifier: ServiceIdentifier + ): ManhattanKVClientMtlsParams = { + ManhattanKVClientMtlsParams(serviceIdentifier) + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala new file mode 100644 index 000000000..194730261 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/SocialGraphServiceClientModule.scala @@ -0,0 +1,40 @@ +package com.twitter.usersignalservice.module + +import com.twitter.inject.Injector +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle._ +import com.twitter.finagle.mux.ClientDiscardedRequestException +import com.twitter.finagle.service.ReqRep +import com.twitter.finagle.service.ResponseClass +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finatra.mtls.thriftmux.modules.MtlsClient +import com.twitter.inject.thrift.modules.ThriftMethodBuilderClientModule +import com.twitter.util.Duration +import com.twitter.util.Throw +import com.twitter.socialgraph.thriftscala.SocialGraphService + +object SocialGraphServiceClientModule + extends ThriftMethodBuilderClientModule[ + SocialGraphService.ServicePerEndpoint, + SocialGraphService.MethodPerEndpoint + ] + with MtlsClient { + override val label = "socialgraph" + override val dest = "/s/socialgraph/socialgraph" + override val requestTimeout: Duration = 30.milliseconds + + override def configureThriftMuxClient( + injector: Injector, + client: ThriftMux.Client + ): ThriftMux.Client = { + super + .configureThriftMuxClient(injector, client) + .withStatsReceiver(injector.instance[StatsReceiver].scope("clnt")) + .withSessionQualifier + .successRateFailureAccrual(successRate = 0.9, window = 30.seconds) + .withResponseClassifier { + case ReqRep(_, Throw(_: ClientDiscardedRequestException)) => ResponseClass.Ignorable + } + } + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala new file mode 100644 index 000000000..ffe26f8c4 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module/TimerModule.scala @@ -0,0 +1,12 @@ +package com.twitter.usersignalservice.module +import com.google.inject.Provides +import com.twitter.finagle.util.DefaultTimer +import com.twitter.inject.TwitterModule +import com.twitter.util.Timer +import javax.inject.Singleton + +object TimerModule extends TwitterModule { + @Singleton + @Provides + def providesTimer: Timer = DefaultTimer +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD new file mode 100644 index 000000000..d1cd4e3a3 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/BUILD @@ -0,0 +1,13 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "stitch/stitch-storehaus", + "strato/src/main/scala/com/twitter/strato/fed", + "strato/src/main/scala/com/twitter/strato/fed/server", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/handler", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/module", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala new file mode 100644 index 000000000..92d956001 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/service/UserSignalService.scala @@ -0,0 +1,26 @@ +package com.twitter.usersignalservice +package service + +import com.google.inject.Inject +import com.google.inject.Singleton +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.stitch.storehaus.StitchOfReadableStore +import com.twitter.usersignalservice.config.SignalFetcherConfig +import com.twitter.usersignalservice.handler.UserSignalHandler +import com.twitter.usersignalservice.thriftscala.BatchSignalRequest +import com.twitter.usersignalservice.thriftscala.BatchSignalResponse +import com.twitter.util.Timer + +@Singleton +class UserSignalService @Inject() ( + signalFetcherConfig: SignalFetcherConfig, + timer: Timer, + stats: StatsReceiver) { + + private val userSignalHandler = + new UserSignalHandler(signalFetcherConfig, timer, stats) + + val userSignalServiceHandlerStoreStitch: BatchSignalRequest => com.twitter.stitch.Stitch[ + BatchSignalResponse + ] = StitchOfReadableStore(userSignalHandler.toReadableStore) +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala new file mode 100644 index 000000000..a72348b7b --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountBlocksFetcher.scala @@ -0,0 +1,40 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.socialgraph.thriftscala.RelationshipType +import com.twitter.socialgraph.thriftscala.SocialGraphService +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.BaseSignalFetcher +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.signals.common.SGSUtils +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class AccountBlocksFetcher @Inject() ( + sgsClient: SocialGraphService.MethodPerEndpoint, + timer: Timer, + stats: StatsReceiver) + extends BaseSignalFetcher { + + override type RawSignalType = Signal + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(this.name) + + override def getRawSignals( + userId: UserId + ): Future[Option[Seq[RawSignalType]]] = { + SGSUtils.getSGSRawSignals(userId, sgsClient, RelationshipType.Blocking, SignalType.AccountBlock) + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map(_.map(_.take(query.maxResults.getOrElse(Int.MaxValue)))) + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala new file mode 100644 index 000000000..60cc2bbd7 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountFollowsFetcher.scala @@ -0,0 +1,44 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.socialgraph.thriftscala.RelationshipType +import com.twitter.socialgraph.thriftscala.SocialGraphService +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.BaseSignalFetcher +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.signals.common.SGSUtils +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class AccountFollowsFetcher @Inject() ( + sgsClient: SocialGraphService.MethodPerEndpoint, + timer: Timer, + stats: StatsReceiver) + extends BaseSignalFetcher { + + override type RawSignalType = Signal + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(this.name) + + override def getRawSignals( + userId: UserId + ): Future[Option[Seq[RawSignalType]]] = { + SGSUtils.getSGSRawSignals( + userId, + sgsClient, + RelationshipType.Following, + SignalType.AccountFollow) + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map(_.map(_.take(query.maxResults.getOrElse(Int.MaxValue)))) + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala new file mode 100644 index 000000000..27eb0a36d --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/AccountMutesFetcher.scala @@ -0,0 +1,40 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.socialgraph.thriftscala.RelationshipType +import com.twitter.socialgraph.thriftscala.SocialGraphService +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.BaseSignalFetcher +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.signals.common.SGSUtils +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class AccountMutesFetcher @Inject() ( + sgsClient: SocialGraphService.MethodPerEndpoint, + timer: Timer, + stats: StatsReceiver) + extends BaseSignalFetcher { + + override type RawSignalType = Signal + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(this.name) + + override def getRawSignals( + userId: UserId + ): Future[Option[Seq[RawSignalType]]] = { + SGSUtils.getSGSRawSignals(userId, sgsClient, RelationshipType.Muting, SignalType.AccountMute) + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map(_.map(_.take(query.maxResults.getOrElse(Int.MaxValue)))) + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD new file mode 100644 index 000000000..50380a581 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/BUILD @@ -0,0 +1,34 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "3rdparty/jvm/com/twitter/bijection:scrooge", + "3rdparty/jvm/javax/inject:javax.inject", + "3rdparty/src/jvm/com/twitter/storehaus:core", + "discovery-ds/src/main/thrift/com/twitter/dds/jobs/repeated_profile_visits:profile_visit-scala", + "flock-client/src/main/thrift:thrift-scala", + "frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato", + "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", + "src/scala/com/twitter/scalding_internal/job", + "src/scala/com/twitter/simclusters_v2/common", + "src/scala/com/twitter/storehaus_internal/manhattan", + "src/scala/com/twitter/storehaus_internal/manhattan/config", + "src/scala/com/twitter/storehaus_internal/manhattan2", + "src/scala/com/twitter/storehaus_internal/offline", + "src/scala/com/twitter/storehaus_internal/util", + "src/scala/com/twitter/twistly/common", + "src/thrift/com/twitter/experiments/general_metrics:general_metrics-scala", + "src/thrift/com/twitter/frigate/data_pipeline:frigate-user-history-thrift-scala", + "src/thrift/com/twitter/onboarding/relevance/tweet_engagement:tweet_engagement-scala", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "src/thrift/com/twitter/socialgraph:thrift-scala", + "src/thrift/com/twitter/traffic_attribution:traffic_attribution-scala", + "strato/src/main/scala/com/twitter/strato/client", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base", + "user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + "util/util-core:util-core-util", + "util/util-core/src/main/java/com/twitter/util", + "util/util-stats/src/main/scala/com/twitter/finagle/stats", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala new file mode 100644 index 000000000..22c0b0852 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedTweetFetcher.scala @@ -0,0 +1,97 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.thriftscala.RecentNegativeEngagedTweet +import com.twitter.twistly.thriftscala.TweetNegativeEngagementType +import com.twitter.twistly.thriftscala.UserRecentNegativeEngagedTweets +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class NegativeEngagedTweetFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentNegativeEngagedTweets] { + + import NegativeEngagedTweetFetcher._ + + override type RawSignalType = RecentNegativeEngagedTweet + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = stratoPath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentNegativeEngagedTweets] = + ScroogeConv.fromStruct[UserRecentNegativeEngagedTweets] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) + + override protected def toRawSignals( + stratoValue: UserRecentNegativeEngagedTweets + ): Seq[RecentNegativeEngagedTweet] = { + stratoValue.recentNegativeEngagedTweets + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RecentNegativeEngagedTweet]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { signals => + signals + .filter(signal => negativeEngagedTweetTypeFilter(query.signalType, signal)) + .map { signal => + Signal( + query.signalType, + signal.engagedAt, + Some(InternalId.TweetId(signal.tweetId)) + ) + } + .groupBy(_.targetInternalId) // groupBy if there's duplicated authorIds + .mapValues(_.maxBy(_.timestamp)) + .values + .toSeq + .sortBy(-_.timestamp) + .take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } +} + +object NegativeEngagedTweetFetcher { + + val stratoPath = "recommendations/twistly/userRecentNegativeEngagedTweets" + private val defaultVersion = 0L + + private def negativeEngagedTweetTypeFilter( + signalType: SignalType, + signal: RecentNegativeEngagedTweet + ): Boolean = { + signalType match { + case SignalType.TweetDontLike => + signal.engagementType == TweetNegativeEngagementType.DontLike + case SignalType.TweetSeeFewer => + signal.engagementType == TweetNegativeEngagementType.SeeFewer + case SignalType.TweetReport => + signal.engagementType == TweetNegativeEngagementType.ReportClick + case SignalType.NegativeEngagedTweetId => true + case _ => false + } + } + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala new file mode 100644 index 000000000..c07f61f91 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NegativeEngagedUserFetcher.scala @@ -0,0 +1,79 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.thriftscala.RecentNegativeEngagedTweet +import com.twitter.twistly.thriftscala.UserRecentNegativeEngagedTweets +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class NegativeEngagedUserFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentNegativeEngagedTweets] { + + import NegativeEngagedUserFetcher._ + + override type RawSignalType = RecentNegativeEngagedTweet + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = stratoPath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentNegativeEngagedTweets] = + ScroogeConv.fromStruct[UserRecentNegativeEngagedTweets] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) + + override protected def toRawSignals( + stratoValue: UserRecentNegativeEngagedTweets + ): Seq[RecentNegativeEngagedTweet] = { + stratoValue.recentNegativeEngagedTweets + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RecentNegativeEngagedTweet]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { signals => + signals + .map { e => + Signal( + defaultNegativeSignalType, + e.engagedAt, + Some(InternalId.UserId(e.authorId)) + ) + } + .groupBy(_.targetInternalId) // groupBy if there's duplicated authorIds + .mapValues(_.maxBy(_.timestamp)) + .values + .toSeq + .sortBy(-_.timestamp) + } + } + } +} + +object NegativeEngagedUserFetcher { + + val stratoPath = "recommendations/twistly/userRecentNegativeEngagedTweets" + private val defaultVersion = 0L + private val defaultNegativeSignalType = SignalType.NegativeEngagedUserId + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala new file mode 100644 index 000000000..5c40ec6a8 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/NotificationOpenAndClickFetcher.scala @@ -0,0 +1,145 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.frigate.common.store.strato.StratoFetchableStore +import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.ClientEngagementEvent +import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.LatestEvents +import com.twitter.frigate.data_pipeline.candidate_generation.thriftscala.LatestNegativeEngagementEvents +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.storehaus.ReadableStore +import com.twitter.strato.client.Client +import com.twitter.twistly.common.TweetId +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.BaseSignalFetcher +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class NotificationOpenAndClickFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends BaseSignalFetcher { + import NotificationOpenAndClickFetcher._ + + override type RawSignalType = ClientEngagementEvent + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(this.name) + + private val latestEventsStore: ReadableStore[UserId, LatestEvents] = { + StratoFetchableStore + .withUnitView[UserId, LatestEvents](stratoClient, latestEventStoreColumn) + } + + private val notificationNegativeEngagementStore: ReadableStore[UserId, Seq[ + NotificationNegativeEngagement + ]] = { + StratoFetchableStore + .withUnitView[UserId, LatestNegativeEngagementEvents]( + stratoClient, + labeledPushRecsNegativeEngagementsColumn).mapValues(fromLatestNegativeEngagementEvents) + } + + override def getRawSignals( + userId: UserId + ): Future[Option[Seq[RawSignalType]]] = { + val notificationNegativeEngagementEventsFut = + notificationNegativeEngagementStore.get(userId) + val latestEventsFut = latestEventsStore.get(userId) + + Future + .join(latestEventsFut, notificationNegativeEngagementEventsFut).map { + case (latestEventsOpt, latestNegativeEngagementEventsOpt) => + latestEventsOpt.map { latestEvents => + // Negative Engagement Events Filter + filterNegativeEngagementEvents( + latestEvents.engagementEvents, + latestNegativeEngagementEventsOpt.getOrElse(Seq.empty), + statsReceiver.scope("filterNegativeEngagementEvents")) + } + } + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { + _.take(query.maxResults.getOrElse(Int.MaxValue)).map { clientEngagementEvent => + Signal( + SignalType.NotificationOpenAndClickV1, + timestamp = clientEngagementEvent.timestampMillis, + targetInternalId = Some(InternalId.TweetId(clientEngagementEvent.tweetId)) + ) + } + } + } + } +} + +object NotificationOpenAndClickFetcher { + private val latestEventStoreColumn = "frigate/magicrecs/labeledPushRecsAggregated.User" + private val labeledPushRecsNegativeEngagementsColumn = + "frigate/magicrecs/labeledPushRecsNegativeEngagements.User" + + case class NotificationNegativeEngagement( + tweetId: TweetId, + timestampMillis: Long, + isNtabDisliked: Boolean, + isReportTweetClicked: Boolean, + isReportTweetDone: Boolean, + isReportUserClicked: Boolean, + isReportUserDone: Boolean) + + def fromLatestNegativeEngagementEvents( + latestNegativeEngagementEvents: LatestNegativeEngagementEvents + ): Seq[NotificationNegativeEngagement] = { + latestNegativeEngagementEvents.negativeEngagementEvents.map { event => + NotificationNegativeEngagement( + event.tweetId, + event.timestampMillis, + event.isNtabDisliked.getOrElse(false), + event.isReportTweetClicked.getOrElse(false), + event.isReportTweetDone.getOrElse(false), + event.isReportUserClicked.getOrElse(false), + event.isReportUserDone.getOrElse(false) + ) + } + } + + private def filterNegativeEngagementEvents( + engagementEvents: Seq[ClientEngagementEvent], + negativeEvents: Seq[NotificationNegativeEngagement], + statsReceiver: StatsReceiver + ): Seq[ClientEngagementEvent] = { + if (negativeEvents.nonEmpty) { + statsReceiver.counter("filterNegativeEngagementEvents").incr() + statsReceiver.stat("eventSizeBeforeFilter").add(engagementEvents.size) + + val negativeEngagementIdSet = + negativeEvents.collect { + case event + if event.isNtabDisliked || event.isReportTweetClicked || event.isReportTweetDone || event.isReportUserClicked || event.isReportUserDone => + event.tweetId + }.toSet + + // negative event size + statsReceiver.stat("negativeEventsSize").add(negativeEngagementIdSet.size) + + // filter out negative engagement sources + val result = engagementEvents.filterNot { event => + negativeEngagementIdSet.contains(event.tweetId) + } + + statsReceiver.stat("eventSizeAfterFilter").add(result.size) + + result + } else engagementEvents + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala new file mode 100644 index 000000000..46d5b8f9c --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/OriginalTweetsFetcher.scala @@ -0,0 +1,70 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.common.TwistlyProfile +import com.twitter.twistly.thriftscala.EngagementMetadata.OriginalTweetMetadata +import com.twitter.twistly.thriftscala.RecentEngagedTweet +import com.twitter.twistly.thriftscala.UserRecentEngagedTweets +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class OriginalTweetsFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { + import OriginalTweetsFetcher._ + override type RawSignalType = RecentEngagedTweet + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = + TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentEngagedTweets] = + ScroogeConv.fromStruct[UserRecentEngagedTweets] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) + + override protected def toRawSignals( + userRecentEngagedTweets: UserRecentEngagedTweets + ): Seq[RawSignalType] = + userRecentEngagedTweets.recentEngagedTweets + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { signals => + val lookBackWindowFilteredSignals = + SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) + lookBackWindowFilteredSignals + .collect { + case RecentEngagedTweet(tweetId, engagedAt, _: OriginalTweetMetadata, _) => + Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) + }.take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } + +} + +object OriginalTweetsFetcher { + // see com.twitter.twistly.store.UserRecentEngagedTweetsStore + private val DefaultVersion = 0 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala new file mode 100644 index 000000000..1b93df59d --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileClickFetcher.scala @@ -0,0 +1,98 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.thriftscala.RecentProfileClickImpressEvents +import com.twitter.twistly.thriftscala.ProfileClickImpressEvent +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class ProfileClickFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, RecentProfileClickImpressEvents] { + + import ProfileClickFetcher._ + + override type RawSignalType = ProfileClickImpressEvent + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = stratoPath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[RecentProfileClickImpressEvents] = + ScroogeConv.fromStruct[RecentProfileClickImpressEvents] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) + + override protected def toRawSignals( + stratoValue: RecentProfileClickImpressEvents + ): Seq[ProfileClickImpressEvent] = { + stratoValue.events + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[ProfileClickImpressEvent]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { events => + events + .map { clicks => + clicks + .filter(dwelltimeFilter(_, query.signalType)) + .map(signalFromProfileClick(_, query.signalType)) + .sortBy(-_.timestamp) + .take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } +} + +object ProfileClickFetcher { + + val stratoPath = "recommendations/twistly/userRecentProfileClickImpress" + private val defaultVersion = 0L + private val sec2millis: Int => Long = i => i * 1000L + private val minDwellTimeMap: Map[SignalType, Long] = Map( + SignalType.GoodProfileClick -> sec2millis(10), + SignalType.GoodProfileClick20s -> sec2millis(20), + SignalType.GoodProfileClick30s -> sec2millis(30), + SignalType.GoodProfileClickFiltered -> sec2millis(10), + SignalType.GoodProfileClick20sFiltered -> sec2millis(20), + SignalType.GoodProfileClick30sFiltered -> sec2millis(30), + ) + + def signalFromProfileClick( + profileClickImpressEvent: ProfileClickImpressEvent, + signalType: SignalType + ): Signal = { + Signal( + signalType, + profileClickImpressEvent.engagedAt, + Some(InternalId.UserId(profileClickImpressEvent.entityId)) + ) + } + + def dwelltimeFilter( + profileClickImpressEvent: ProfileClickImpressEvent, + signalType: SignalType + ): Boolean = { + val goodClickDwellTime = minDwellTimeMap(signalType) + profileClickImpressEvent.clickImpressEventMetadata.totalDwellTime >= goodClickDwellTime + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala new file mode 100644 index 000000000..1cb27261f --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ProfileVisitsFetcher.scala @@ -0,0 +1,143 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.bijection.Codec +import com.twitter.bijection.scrooge.BinaryScalaCodec +import com.twitter.dds.jobs.repeated_profile_visits.thriftscala.ProfileVisitSet +import com.twitter.dds.jobs.repeated_profile_visits.thriftscala.ProfileVisitorInfo +import com.twitter.experiments.general_metrics.thriftscala.IdType +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams +import com.twitter.storehaus_internal.manhattan.Apollo +import com.twitter.storehaus_internal.manhattan.ManhattanCluster +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.ManhattanSignalFetcher +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +case class ProfileVisitMetadata( + targetId: Option[Long], + totalTargetVisitsInLast14Days: Option[Int], + totalTargetVisitsInLast90Days: Option[Int], + totalTargetVisitsInLast180Days: Option[Int], + latestTargetVisitTimestampInLast90Days: Option[Long]) + +@Singleton +case class ProfileVisitsFetcher @Inject() ( + manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams, + timer: Timer, + stats: StatsReceiver) + extends ManhattanSignalFetcher[ProfileVisitorInfo, ProfileVisitSet] { + import ProfileVisitsFetcher._ + + override type RawSignalType = ProfileVisitMetadata + + override val manhattanAppId: String = MHAppId + override val manhattanDatasetName: String = MHDatasetName + override val manhattanClusterId: ManhattanCluster = Apollo + override val manhattanKeyCodec: Codec[ProfileVisitorInfo] = BinaryScalaCodec(ProfileVisitorInfo) + override val manhattanRawSignalCodec: Codec[ProfileVisitSet] = BinaryScalaCodec(ProfileVisitSet) + + override protected def toManhattanKey(userId: UserId): ProfileVisitorInfo = + ProfileVisitorInfo(userId, IdType.User) + + override protected def toRawSignals(manhattanValue: ProfileVisitSet): Seq[ProfileVisitMetadata] = + manhattanValue.profileVisitSet + .map { + _.collect { + // only keep the Non-NSFW and not-following profile visits + case profileVisit + if profileVisit.targetId.nonEmpty + // The below check covers 180 days, not only 90 days as the name implies. + // See comment on [[ProfileVisit.latestTargetVisitTimestampInLast90Days]] thrift. + && profileVisit.latestTargetVisitTimestampInLast90Days.nonEmpty + && !profileVisit.isTargetNSFW.getOrElse(false) + && !profileVisit.doesSourceIdFollowTargetId.getOrElse(false) => + ProfileVisitMetadata( + targetId = profileVisit.targetId, + totalTargetVisitsInLast14Days = profileVisit.totalTargetVisitsInLast14Days, + totalTargetVisitsInLast90Days = profileVisit.totalTargetVisitsInLast90Days, + totalTargetVisitsInLast180Days = profileVisit.totalTargetVisitsInLast180Days, + latestTargetVisitTimestampInLast90Days = + profileVisit.latestTargetVisitTimestampInLast90Days + ) + }.toSeq + }.getOrElse(Seq.empty) + + override val name: String = this.getClass.getCanonicalName + + override val statsReceiver: StatsReceiver = stats.scope(name) + + override def process( + query: Query, + rawSignals: Future[Option[Seq[ProfileVisitMetadata]]] + ): Future[Option[Seq[Signal]]] = rawSignals.map { profiles => + profiles + .map { + _.filter(profileVisitMetadata => visitCountFilter(profileVisitMetadata, query.signalType)) + .sortBy(profileVisitMetadata => + -visitCountMap(query.signalType)(profileVisitMetadata).getOrElse(0)) + .map(profileVisitMetadata => + signalFromProfileVisit(profileVisitMetadata, query.signalType)) + .take(query.maxResults.getOrElse(Int.MaxValue)) + } + } +} + +object ProfileVisitsFetcher { + private val MHAppId = "repeated_profile_visits_aggregated" + private val MHDatasetName = "repeated_profile_visits_aggregated" + + private val minVisitCountMap: Map[SignalType, Int] = Map( + SignalType.RepeatedProfileVisit14dMinVisit2V1 -> 2, + SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative -> 2, + SignalType.RepeatedProfileVisit90dMinVisit6V1 -> 6, + SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative -> 6, + SignalType.RepeatedProfileVisit180dMinVisit6V1 -> 6, + SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative -> 6 + ) + + private val visitCountMap: Map[SignalType, ProfileVisitMetadata => Option[Int]] = Map( + SignalType.RepeatedProfileVisit14dMinVisit2V1 -> + ((profileVisitMetadata: ProfileVisitMetadata) => + profileVisitMetadata.totalTargetVisitsInLast14Days), + SignalType.RepeatedProfileVisit14dMinVisit2V1NoNegative -> + ((profileVisitMetadata: ProfileVisitMetadata) => + profileVisitMetadata.totalTargetVisitsInLast14Days), + SignalType.RepeatedProfileVisit90dMinVisit6V1 -> + ((profileVisitMetadata: ProfileVisitMetadata) => + profileVisitMetadata.totalTargetVisitsInLast90Days), + SignalType.RepeatedProfileVisit90dMinVisit6V1NoNegative -> + ((profileVisitMetadata: ProfileVisitMetadata) => + profileVisitMetadata.totalTargetVisitsInLast90Days), + SignalType.RepeatedProfileVisit180dMinVisit6V1 -> + ((profileVisitMetadata: ProfileVisitMetadata) => + profileVisitMetadata.totalTargetVisitsInLast180Days), + SignalType.RepeatedProfileVisit180dMinVisit6V1NoNegative -> + ((profileVisitMetadata: ProfileVisitMetadata) => + profileVisitMetadata.totalTargetVisitsInLast180Days) + ) + + def signalFromProfileVisit( + profileVisitMetadata: ProfileVisitMetadata, + signalType: SignalType + ): Signal = { + Signal( + signalType, + profileVisitMetadata.latestTargetVisitTimestampInLast90Days.get, + profileVisitMetadata.targetId.map(targetId => InternalId.UserId(targetId)) + ) + } + + def visitCountFilter( + profileVisitMetadata: ProfileVisitMetadata, + signalType: SignalType + ): Boolean = { + visitCountMap(signalType)(profileVisitMetadata).exists(_ >= minVisitCountMap(signalType)) + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala new file mode 100644 index 000000000..ad5cc4f4b --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RealGraphOonFetcher.scala @@ -0,0 +1,70 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.usersignalservice.base.Query +import com.twitter.wtf.candidate.thriftscala.CandidateSeq +import com.twitter.wtf.candidate.thriftscala.Candidate +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class RealGraphOonFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[UserId, Unit, CandidateSeq] { + import RealGraphOonFetcher._ + override type RawSignalType = Candidate + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = RealGraphOonFetcher.stratoColumnPath + override val stratoView: Unit = None + + override protected val keyConv: Conv[UserId] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[CandidateSeq] = + ScroogeConv.fromStruct[CandidateSeq] + + override protected def toStratoKey(userId: UserId): UserId = userId + + override protected def toRawSignals( + realGraphOonCandidates: CandidateSeq + ): Seq[RawSignalType] = realGraphOonCandidates.candidates + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals + .map { + _.map( + _.sortBy(-_.score) + .collect { + case c if c.score >= MinRgScore => + Signal( + SignalType.RealGraphOon, + RealGraphOonFetcher.DefaultTimestamp, + Some(InternalId.UserId(c.userId))) + }.take(query.maxResults.getOrElse(Int.MaxValue))) + } + } +} + +object RealGraphOonFetcher { + val stratoColumnPath = "recommendations/real_graph/realGraphScoresOon.User" + // quality threshold for real graph score + private val MinRgScore = 0.0 + // no timestamp for RealGraph Candidates, set default as 0L + private val DefaultTimestamp = 0L +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala new file mode 100644 index 000000000..7f84f41c9 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/ReplyTweetsFetcher.scala @@ -0,0 +1,70 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.common.TwistlyProfile +import com.twitter.twistly.thriftscala.EngagementMetadata.ReplyTweetMetadata +import com.twitter.twistly.thriftscala.RecentEngagedTweet +import com.twitter.twistly.thriftscala.UserRecentEngagedTweets +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class ReplyTweetsFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { + import ReplyTweetsFetcher._ + override type RawSignalType = RecentEngagedTweet + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = + TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentEngagedTweets] = + ScroogeConv.fromStruct[UserRecentEngagedTweets] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) + + override protected def toRawSignals( + userRecentEngagedTweets: UserRecentEngagedTweets + ): Seq[RawSignalType] = + userRecentEngagedTweets.recentEngagedTweets + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { signals => + val lookBackWindowFilteredSignals = + SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) + lookBackWindowFilteredSignals + .collect { + case RecentEngagedTweet(tweetId, engagedAt, _: ReplyTweetMetadata, _) => + Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) + }.take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } + +} + +object ReplyTweetsFetcher { + // see com.twitter.twistly.store.UserRecentEngagedTweetsStore + private val DefaultVersion = 0 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala new file mode 100644 index 000000000..4b81c8d0b --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/RetweetsFetcher.scala @@ -0,0 +1,74 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.common.TwistlyProfile +import com.twitter.twistly.thriftscala.EngagementMetadata.RetweetMetadata +import com.twitter.twistly.thriftscala.RecentEngagedTweet +import com.twitter.twistly.thriftscala.UserRecentEngagedTweets +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class RetweetsFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { + import RetweetsFetcher._ + override type RawSignalType = RecentEngagedTweet + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = + TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentEngagedTweets] = + ScroogeConv.fromStruct[UserRecentEngagedTweets] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) + + override protected def toRawSignals( + userRecentEngagedTweets: UserRecentEngagedTweets + ): Seq[RawSignalType] = + userRecentEngagedTweets.recentEngagedTweets + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { signals => + val lookBackWindowFilteredSignals = + SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) + lookBackWindowFilteredSignals + .filter { recentEngagedTweet => + recentEngagedTweet.features.statusCounts + .flatMap(_.favoriteCount).exists(_ >= MinFavCount) + }.collect { + case RecentEngagedTweet(tweetId, engagedAt, _: RetweetMetadata, _) => + Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) + }.take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } + +} + +object RetweetsFetcher { + private val MinFavCount = 10 + // see com.twitter.twistly.store.UserRecentEngagedTweetsStore + private val DefaultVersion = 0 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala new file mode 100644 index 000000000..01be88a26 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/SignalFilter.scala @@ -0,0 +1,48 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.twistly.thriftscala.EngagementMetadata.FavoriteMetadata +import com.twitter.twistly.thriftscala.RecentEngagedTweet +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Time + +// Shared Logic for filtering signal across different signal types +object SignalFilter { + + final val LookBackWindow90DayFilterEnabledSignalTypes: Set[SignalType] = Set( + SignalType.TweetFavorite90dV2, + SignalType.Retweet90dV2, + SignalType.OriginalTweet90dV2, + SignalType.Reply90dV2) + + /* Raw Signal Filter for TweetFavorite, Retweet, Original Tweet and Reply + * Filter out all raw signal if the most recent {Tweet Favorite + Retweet + Original Tweet + Reply} + * is older than 90 days. + * The filter is shared across 4 signal types as they are stored in the same physical store + * thus sharing the same TTL + * */ + def lookBackWindow90DayFilter( + signals: Seq[RecentEngagedTweet], + querySignalType: SignalType + ): Seq[RecentEngagedTweet] = { + if (LookBackWindow90DayFilterEnabledSignalTypes.contains( + querySignalType) && !isMostRecentSignalWithin90Days(signals.head)) { + Seq.empty + } else signals + } + + private def isMostRecentSignalWithin90Days( + signal: RecentEngagedTweet + ): Boolean = { + val diff = Time.now - Time.fromMilliseconds(signal.engagedAt) + diff.inDays <= 90 + } + + def isPromotedTweet(signal: RecentEngagedTweet): Boolean = { + signal match { + case RecentEngagedTweet(_, _, metadata: FavoriteMetadata, _) => + metadata.favoriteMetadata.isAd.getOrElse(false) + case _ => false + } + } + +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala new file mode 100644 index 000000000..19462a4e2 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetClickFetcher.scala @@ -0,0 +1,94 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.thriftscala.RecentTweetClickImpressEvents +import com.twitter.twistly.thriftscala.TweetClickImpressEvent +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class TweetClickFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, RecentTweetClickImpressEvents] { + + import TweetClickFetcher._ + + override type RawSignalType = TweetClickImpressEvent + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = stratoPath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[RecentTweetClickImpressEvents] = + ScroogeConv.fromStruct[RecentTweetClickImpressEvents] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, defaultVersion) + + override protected def toRawSignals( + stratoValue: RecentTweetClickImpressEvents + ): Seq[TweetClickImpressEvent] = { + stratoValue.events + } + + override def process( + query: Query, + rawSignals: Future[Option[Seq[TweetClickImpressEvent]]] + ): Future[Option[Seq[Signal]]] = + rawSignals.map { events => + events.map { clicks => + clicks + .filter(dwelltimeFilter(_, query.signalType)) + .map(signalFromTweetClick(_, query.signalType)) + .sortBy(-_.timestamp) + .take(query.maxResults.getOrElse(Int.MaxValue)) + } + } +} + +object TweetClickFetcher { + + val stratoPath = "recommendations/twistly/userRecentTweetClickImpress" + private val defaultVersion = 0L + + private val minDwellTimeMap: Map[SignalType, Long] = Map( + SignalType.GoodTweetClick -> 2 * 1000L, + SignalType.GoodTweetClick5s -> 5 * 1000L, + SignalType.GoodTweetClick10s -> 10 * 1000L, + SignalType.GoodTweetClick30s -> 30 * 1000L, + ) + + def signalFromTweetClick( + tweetClickImpressEvent: TweetClickImpressEvent, + signalType: SignalType + ): Signal = { + Signal( + signalType, + tweetClickImpressEvent.engagedAt, + Some(InternalId.TweetId(tweetClickImpressEvent.entityId)) + ) + } + + def dwelltimeFilter( + tweetClickImpressEvent: TweetClickImpressEvent, + signalType: SignalType + ): Boolean = { + val goodClickDwellTime = minDwellTimeMap(signalType) + tweetClickImpressEvent.clickImpressEventMetadata.totalDwellTime >= goodClickDwellTime + } +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala new file mode 100644 index 000000000..b427f722f --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetFavoritesFetcher.scala @@ -0,0 +1,86 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.simclusters_v2.common.UserId +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.twistly.common.TwistlyProfile +import com.twitter.twistly.thriftscala.EngagementMetadata.FavoriteMetadata +import com.twitter.twistly.thriftscala.RecentEngagedTweet +import com.twitter.twistly.thriftscala.UserRecentEngagedTweets +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.base.StratoSignalFetcher +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class TweetFavoritesFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[(UserId, Long), Unit, UserRecentEngagedTweets] { + import TweetFavoritesFetcher._ + override type RawSignalType = RecentEngagedTweet + override val name: String = this.getClass.getCanonicalName + override val statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = + TwistlyProfile.TwistlyProdProfile.userRecentEngagedStorePath + override val stratoView: Unit = None + + override protected val keyConv: Conv[(UserId, Long)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentEngagedTweets] = + ScroogeConv.fromStruct[UserRecentEngagedTweets] + + override protected def toStratoKey(userId: UserId): (UserId, Long) = (userId, DefaultVersion) + + override protected def toRawSignals( + userRecentEngagedTweets: UserRecentEngagedTweets + ): Seq[RawSignalType] = + userRecentEngagedTweets.recentEngagedTweets + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RawSignalType]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { signals => + val lookBackWindowFilteredSignals = + SignalFilter.lookBackWindow90DayFilter(signals, query.signalType) + lookBackWindowFilteredSignals + .filter { recentEngagedTweet => + recentEngagedTweet.features.statusCounts + .flatMap(_.favoriteCount).exists(_ >= MinFavCount) + }.filter { recentEngagedTweet => + applySignalTweetTypeFilter(query.signalType, recentEngagedTweet) + }.collect { + case RecentEngagedTweet(tweetId, engagedAt, _: FavoriteMetadata, _) => + Signal(query.signalType, engagedAt, Some(InternalId.TweetId(tweetId))) + }.take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } + private def applySignalTweetTypeFilter( + signal: SignalType, + recentEngagedTweet: RecentEngagedTweet + ): Boolean = { + // Perform specific filters for particular signal types. + signal match { + case SignalType.AdFavorite => SignalFilter.isPromotedTweet(recentEngagedTweet) + case _ => true + } + } +} + +object TweetFavoritesFetcher { + private val MinFavCount = 10 + // see com.twitter.twistly.store.UserRecentEngagedTweetsStore + private val DefaultVersion = 0 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala new file mode 100644 index 000000000..6205e1bc3 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/TweetSharesFetcher.scala @@ -0,0 +1,77 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.bijection.Codec +import com.twitter.bijection.scrooge.BinaryScalaCodec +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.onboarding.relevance.tweet_engagement.thriftscala.EngagementIdentifier +import com.twitter.onboarding.relevance.tweet_engagement.thriftscala.TweetEngagement +import com.twitter.onboarding.relevance.tweet_engagement.thriftscala.TweetEngagements +import com.twitter.scalding_internal.multiformat.format.keyval.KeyValInjection.Long2BigEndian +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.storage.client.manhattan.kv.ManhattanKVClientMtlsParams +import com.twitter.storehaus_internal.manhattan.Apollo +import com.twitter.storehaus_internal.manhattan.ManhattanCluster +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.base.ManhattanSignalFetcher +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Future +import com.twitter.util.Timer +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class TweetSharesFetcher @Inject() ( + manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams, + timer: Timer, + stats: StatsReceiver) + extends ManhattanSignalFetcher[Long, TweetEngagements] { + + import TweetSharesFetcher._ + + override type RawSignalType = TweetEngagement + + override def name: String = this.getClass.getCanonicalName + + override def statsReceiver: StatsReceiver = stats.scope(name) + + override protected def manhattanAppId: String = MHAppId + + override protected def manhattanDatasetName: String = MHDatasetName + + override protected def manhattanClusterId: ManhattanCluster = Apollo + + override protected def manhattanKeyCodec: Codec[Long] = Long2BigEndian + + override protected def manhattanRawSignalCodec: Codec[TweetEngagements] = BinaryScalaCodec( + TweetEngagements) + + override protected def toManhattanKey(userId: UserId): Long = userId + + override protected def toRawSignals( + manhattanValue: TweetEngagements + ): Seq[TweetEngagement] = manhattanValue.tweetEngagements + + override def process( + query: Query, + rawSignals: Future[Option[Seq[TweetEngagement]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { + _.collect { + case tweetEngagement if (tweetEngagement.engagementType == EngagementIdentifier.Share) => + Signal( + SignalType.TweetShareV1, + tweetEngagement.timestampMs, + Some(InternalId.TweetId(tweetEngagement.tweetId))) + }.sortBy(-_.timestamp).take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } +} + +object TweetSharesFetcher { + private val MHAppId = "uss_prod_apollo" + private val MHDatasetName = "tweet_share_engagements" +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala new file mode 100644 index 000000000..1577b2e99 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsPlayback50Fetcher.scala @@ -0,0 +1,72 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.twistly.common.UserId +import com.twitter.twistly.thriftscala.UserRecentVideoViewTweets +import com.twitter.twistly.thriftscala.VideoViewEngagementType +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Future +import com.twitter.util.Timer +import com.twitter.twistly.thriftscala.RecentVideoViewTweet +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.usersignalservice.base.StratoSignalFetcher +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class VideoTweetsPlayback50Fetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[ + (UserId, VideoViewEngagementType), + Unit, + UserRecentVideoViewTweets + ] { + import VideoTweetsPlayback50Fetcher._ + + override type RawSignalType = RecentVideoViewTweet + override def name: String = this.getClass.getCanonicalName + override def statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = StratoColumn + override val stratoView: Unit = None + override protected val keyConv: Conv[(UserId, VideoViewEngagementType)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentVideoViewTweets] = + ScroogeConv.fromStruct[UserRecentVideoViewTweets] + + override protected def toStratoKey(userId: UserId): (UserId, VideoViewEngagementType) = + (userId, VideoViewEngagementType.VideoPlayback50) + + override protected def toRawSignals( + stratoValue: UserRecentVideoViewTweets + ): Seq[RecentVideoViewTweet] = stratoValue.recentEngagedTweets + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RecentVideoViewTweet]]] + ): Future[Option[Seq[Signal]]] = rawSignals.map { + _.map { + _.filter(videoView => + !videoView.isPromotedTweet && videoView.videoDurationSeconds >= MinVideoDurationSeconds) + .map { rawSignal => + Signal( + SignalType.VideoView90dPlayback50V1, + rawSignal.engagedAt, + Some(InternalId.TweetId(rawSignal.tweetId))) + }.take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + +} + +object VideoTweetsPlayback50Fetcher { + private val StratoColumn = "recommendations/twistly/userRecentVideoViewTweetEngagements" + private val MinVideoDurationSeconds = 10 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala new file mode 100644 index 000000000..d513b978c --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/VideoTweetsQualityViewFetcher.scala @@ -0,0 +1,72 @@ +package com.twitter.usersignalservice.signals + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.twistly.common.UserId +import com.twitter.twistly.thriftscala.UserRecentVideoViewTweets +import com.twitter.twistly.thriftscala.VideoViewEngagementType +import com.twitter.usersignalservice.base.Query +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.util.Future +import com.twitter.util.Timer +import com.twitter.twistly.thriftscala.RecentVideoViewTweet +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.strato.client.Client +import com.twitter.strato.data.Conv +import com.twitter.strato.thrift.ScroogeConv +import com.twitter.usersignalservice.base.StratoSignalFetcher +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +case class VideoTweetsQualityViewFetcher @Inject() ( + stratoClient: Client, + timer: Timer, + stats: StatsReceiver) + extends StratoSignalFetcher[ + (UserId, VideoViewEngagementType), + Unit, + UserRecentVideoViewTweets + ] { + import VideoTweetsQualityViewFetcher._ + override type RawSignalType = RecentVideoViewTweet + override def name: String = this.getClass.getCanonicalName + override def statsReceiver: StatsReceiver = stats.scope(name) + + override val stratoColumnPath: String = StratoColumn + override val stratoView: Unit = None + override protected val keyConv: Conv[(UserId, VideoViewEngagementType)] = Conv.ofType + override protected val viewConv: Conv[Unit] = Conv.ofType + override protected val valueConv: Conv[UserRecentVideoViewTweets] = + ScroogeConv.fromStruct[UserRecentVideoViewTweets] + + override protected def toStratoKey(userId: UserId): (UserId, VideoViewEngagementType) = + (userId, VideoViewEngagementType.VideoQualityView) + + override protected def toRawSignals( + stratoValue: UserRecentVideoViewTweets + ): Seq[RecentVideoViewTweet] = stratoValue.recentEngagedTweets + + override def process( + query: Query, + rawSignals: Future[Option[Seq[RecentVideoViewTweet]]] + ): Future[Option[Seq[Signal]]] = { + rawSignals.map { + _.map { + _.filter(videoView => + !videoView.isPromotedTweet && videoView.videoDurationSeconds >= MinVideoDurationSeconds) + .map { rawSignal => + Signal( + SignalType.VideoView90dQualityV1, + rawSignal.engagedAt, + Some(InternalId.TweetId(rawSignal.tweetId))) + }.take(query.maxResults.getOrElse(Int.MaxValue)) + } + } + } +} + +object VideoTweetsQualityViewFetcher { + private val StratoColumn = "recommendations/twistly/userRecentVideoViewTweetEngagements" + private val MinVideoDurationSeconds = 10 +} diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD new file mode 100644 index 000000000..baca538b0 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/BUILD @@ -0,0 +1,15 @@ +scala_library( + compiler_option_sets = ["fatal_warnings"], + tags = ["bazel-compatible"], + dependencies = [ + "hermit/hermit-core/src/main/scala/com/twitter/hermit/predicate/socialgraph", + "src/scala/com/twitter/simclusters_v2/common", + "src/scala/com/twitter/twistly/common", + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala", + "src/thrift/com/twitter/socialgraph:thrift-scala", + "user-signal-service/thrift/src/main/thrift:thrift-scala", + "util/util-core:util-core-util", + "util/util-core/src/main/java/com/twitter/util", + "util/util-stats/src/main/scala/com/twitter/finagle/stats", + ], +) diff --git a/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala new file mode 100644 index 000000000..01fbd8f38 --- /dev/null +++ b/user-signal-service/server/src/main/scala/com/twitter/usersignalservice/signals/common/SGSUtils.scala @@ -0,0 +1,59 @@ +package com.twitter.usersignalservice.signals +package common + +import com.twitter.simclusters_v2.thriftscala.InternalId +import com.twitter.socialgraph.thriftscala.EdgesRequest +import com.twitter.socialgraph.thriftscala.EdgesResult +import com.twitter.socialgraph.thriftscala.PageRequest +import com.twitter.socialgraph.thriftscala.RelationshipType +import com.twitter.socialgraph.thriftscala.SocialGraphService +import com.twitter.socialgraph.thriftscala.SrcRelationship +import com.twitter.twistly.common.UserId +import com.twitter.usersignalservice.thriftscala.Signal +import com.twitter.usersignalservice.thriftscala.SignalType +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.Time + +object SGSUtils { + val MaxNumSocialGraphSignals = 200 + val MaxAge: Duration = Duration.fromDays(90) + + def getSGSRawSignals( + userId: UserId, + sgsClient: SocialGraphService.MethodPerEndpoint, + relationshipType: RelationshipType, + signalType: SignalType, + ): Future[Option[Seq[Signal]]] = { + val edgeRequest = EdgesRequest( + relationship = SrcRelationship(userId, relationshipType), + pageRequest = Some(PageRequest(count = None)) + ) + val now = Time.now.inMilliseconds + + sgsClient + .edges(Seq(edgeRequest)) + .map { sgsEdges => + sgsEdges.flatMap { + case EdgesResult(edges, _, _) => + edges.collect { + case edge if edge.createdAt >= now - MaxAge.inMilliseconds => + Signal( + signalType, + timestamp = edge.createdAt, + targetInternalId = Some(InternalId.UserId(edge.target))) + } + } + } + .map { signals => + signals + .take(MaxNumSocialGraphSignals) + .groupBy(_.targetInternalId) + .mapValues(_.maxBy(_.timestamp)) + .values + .toSeq + .sortBy(-_.timestamp) + } + .map(Some(_)) + } +} diff --git a/user-signal-service/thrift/src/main/thrift/BUILD b/user-signal-service/thrift/src/main/thrift/BUILD new file mode 100644 index 000000000..faab4af7e --- /dev/null +++ b/user-signal-service/thrift/src/main/thrift/BUILD @@ -0,0 +1,20 @@ +create_thrift_libraries( + base_name = "thrift", + sources = [ + "client_identifier.thrift", + "service.thrift", + "signal.thrift", + ], + platform = "java8", + tags = ["bazel-compatible"], + dependency_roots = [ + "src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift", + ], + generate_languages = [ + "java", + "scala", + "strato", + ], + provides_java_name = "uss-thrift-java", + provides_scala_name = "uss-thrift-scala", +) diff --git a/user-signal-service/thrift/src/main/thrift/client_identifier.thrift b/user-signal-service/thrift/src/main/thrift/client_identifier.thrift new file mode 100644 index 000000000..c953e6b8f --- /dev/null +++ b/user-signal-service/thrift/src/main/thrift/client_identifier.thrift @@ -0,0 +1,22 @@ +namespace java com.twitter.usersignalservice.thriftjava +namespace py gen.twitter.usersignalservice.service +#@namespace scala com.twitter.usersignalservice.thriftscala +#@namespace strato com.twitter.usersignalservice.strato + +# ClientIdentifier should be defined as ServiceId_Product +enum ClientIdentifier { + # reserve 1-10 for CrMixer + CrMixer_Home = 1 + CrMixer_Notifications = 2 + CrMixer_Email = 3 + # reserve 11-20 for RSX + RepresentationScorer_Home = 11 + RepresentationScorer_Notifications = 12 + + # reserve 21-30 for Explore + ExploreRanker = 21 + + # We will throw an exception after we make sure all clients are sending the + # ClientIdentifier in their request. + Unknown = 9999 +} diff --git a/user-signal-service/thrift/src/main/thrift/service.thrift b/user-signal-service/thrift/src/main/thrift/service.thrift new file mode 100644 index 000000000..a10959ea8 --- /dev/null +++ b/user-signal-service/thrift/src/main/thrift/service.thrift @@ -0,0 +1,23 @@ +namespace java com.twitter.usersignalservice.thriftjava +namespace py gen.twitter.usersignalservice.service +#@namespace scala com.twitter.usersignalservice.thriftscala +#@namespace strato com.twitter.usersignalservice.strato + +include "signal.thrift" +include "client_identifier.thrift" + +struct SignalRequest { + 1: optional i64 maxResults + 2: required signal.SignalType signalType +} + +struct BatchSignalRequest { + 1: required i64 userId(personalDataType = "UserId") + 2: required list signalRequest + # make sure to populate the clientId, otherwise the service would throw exceptions + 3: optional client_identifier.ClientIdentifier clientId +}(hasPersonalData='true') + +struct BatchSignalResponse { + 1: required map> signalResponse +} diff --git a/user-signal-service/thrift/src/main/thrift/signal.thrift b/user-signal-service/thrift/src/main/thrift/signal.thrift new file mode 100644 index 000000000..e32947be8 --- /dev/null +++ b/user-signal-service/thrift/src/main/thrift/signal.thrift @@ -0,0 +1,113 @@ +namespace java com.twitter.usersignalservice.thriftjava +namespace py gen.twitter.usersignalservice.signal +#@namespace scala com.twitter.usersignalservice.thriftscala +#@namespace strato com.twitter.usersignalservice.strato + +include "com/twitter/simclusters_v2/identifier.thrift" + + +enum SignalType { + /** + Please maintain the key space rule to avoid compatibility issue for the downstream production job + * Prod Key space: 0-1000 + * Devel Key space: 1000+ + **/ + + + /* tweet based signals */ + TweetFavorite = 0, // 540 Days Looback window + Retweet = 1, // 540 Days Lookback window + TrafficAttribution = 2, + OriginalTweet = 3, // 540 Days Looback window + Reply = 4, // 540 Days Looback window + /* Tweets that the user shared (sharer side) + * V1: successful shares (click share icon -> click in-app, or off-platform share option + * or copying link) + * */ + TweetShare_V1 = 5, // 14 Days Lookback window + + TweetFavorite_90D_V2 = 6, // 90 Days Lookback window : tweet fav from user with recent engagement in the past 90 days + Retweet_90D_V2 = 7, // 90 Days Lookback window : retweet from user with recent engagement in the past 90 days + OriginalTweet_90D_V2 = 8, // 90 Days Lookback window : original tweet from user with recent engagement in the past 90 days + Reply_90D_V2 = 9,// 90 Days Lookback window : reply from user with recent engagement in the past 90 days + GoodTweetClick = 10,// GoodTweetCilick Signal : Dwell Time Threshold >=2s + + // video tweets that were watched (10s OR 95%) in the past 90 days, are not ads, and have >=10s video + VideoView_90D_Quality_V1 = 11 // 90 Days Lookback window + // video tweets that were watched 50% in the past 90 days, are not ads, and have >=10s video + VideoView_90D_Playback50_V1 = 12 // 90 Days Lookback window + + /* user based signals */ + AccountFollow = 100, // infinite lookback window + RepeatedProfileVisit_14D_MinVisit2_V1 = 101, + RepeatedProfileVisit_90D_MinVisit6_V1 = 102, + RepeatedProfileVisit_180D_MinVisit6_V1 = 109, + RepeatedProfileVisit_14D_MinVisit2_V1_No_Negative = 110, + RepeatedProfileVisit_90D_MinVisit6_V1_No_Negative = 111, + RepeatedProfileVisit_180D_MinVisit6_V1_No_Negative = 112, + RealGraphOon = 104, + TrafficAttributionProfile_30D_LastVisit = 105, + TrafficAttributionProfile_30D_DecayedVisit = 106, + TrafficAttributionProfile_30D_WeightedEventDecayedVisit = 107, + TrafficAttributionProfile_30D_DecayedVisit_WithoutAgathaFilter = 108, + GoodProfileClick = 120, // GoodTweetCilick Signal : Dwell Time Threshold >=10s + AdFavorite = 121, // Favorites filtered to ads TweetFavorite has both organic and ads Favs + + // AccountFollowWithDelay should only be used by high-traffic clients and has 1 min delay + AccountFollowWithDelay = 122, + + + /* notifications based signals */ + /* V1: notification clicks from past 90 days with negative events (reports, dislikes) being filtered */ + NotificationOpenAndClick_V1 = 200, + + /* + negative signals for filter + */ + NegativeEngagedTweetId = 901 // tweetId for all negative engagements + NegativeEngagedUserId = 902 // userId for all negative engagements + AccountBlock = 903, + AccountMute = 904, + // skip 905 - 906 for Account report abuse / report spam + // User clicked dont like from past 90 Days + TweetDontLike = 907 + // User clicked see fewer on the recommended tweet from past 90 Days + TweetSeeFewer = 908 + // User clicked on the "report tweet" option in the tweet caret dropdown menu from past 90 days + TweetReport = 909 + + /* + devel signals + use the num > 1000 to test out signals under development/ddg + put it back to the correct corresponding Key space (0-1000) before ship + */ + GoodTweetClick_5s = 1001,// GoodTweetCilick Signal : Dwell Time Threshold >=5s + GoodTweetClick_10s = 1002,// GoodTweetCilick Signal : Dwell Time Threshold >=10s + GoodTweetClick_30s = 1003,// GoodTweetCilick Signal : Dwell Time Threshold >=30s + + GoodProfileClick_20s = 1004,// GoodProfileClick Signal : Dwell Time Threshold >=20s + GoodProfileClick_30s = 1005,// GoodProfileClick Signal : Dwell Time Threshold >=30s + + GoodProfileClick_Filtered = 1006, // GoodProfileClick Signal filtered by blocks and mutes. + GoodProfileClick_20s_Filtered = 1007// GoodProfileClick Signal : Dwell Time Threshold >=20s, filtered byblocks and mutes. + GoodProfileClick_30s_Filtered = 1008,// GoodProfileClick Signal : Dwell Time Threshold >=30s, filtered by blocks and mutes. + + /* + Unified Signals + These signals are aimed to unify multiple signal fetches into a single response. + This might be a healthier way for our retrievals layer to run inference on. + */ + TweetBasedUnifiedUniformSignal = 1300 + TweetBasedUnifiedEngagementWeightedSignal = 1301 + TweetBasedUnifiedQualityWeightedSignal = 1302 + ProducerBasedUnifiedUniformSignal = 1303 + ProducerBasedUnifiedEngagementWeightedSignal = 1304 + ProducerBasedUnifiedQualityWeightedSignal = 1305 + +} + +struct Signal { + 1: required SignalType signalType + 2: required i64 timestamp + 3: optional identifier.InternalId targetInternalId +}