import os import itertools import subprocess import math SERVICE_NAME = 'rekey-uua' CPU_NUM = 3 HEAP_SIZE = 3 * GB RAM_SIZE = HEAP_SIZE + 1 * GB # We make disk size larger than HEAP so that if we ever need to do a heap dump, it will fit on disk. DISK_SIZE = HEAP_SIZE + 2 * GB class Profile(Struct): package = Default(String, SERVICE_NAME) cmdline_flags = Default(String, '') log_level = Default(String, 'INFO') instances = Default(Integer, 100) kafka_bootstrap_servers = Default(String, '/s/kafka/bluebird-1:kafka-tls') kafka_bootstrap_servers_remote_dest = Default(String, '/s/kafka/bluebird-1:kafka-tls') source_topic = Default(String, 'unified_user_actions') sink_topics = Default(String, 'uua_keyed') decider_overlay = Default(String, '') resources = Resources( cpu = CPU_NUM, ram = RAM_SIZE, disk = DISK_SIZE ) install = Packer.install( name = '{{profile.package}}', version = Workflows.package_version() ) async_profiler_install = Packer.install( name = 'async-profiler', role = 'csl-perf', version = 'latest' ) setup_jaas_config = Process( name = 'setup_jaas_config', cmdline = ''' mkdir -p jaas_config echo "KafkaClient { com.sun.security.auth.module.Krb5LoginModule required principal=\\"discode@TWITTER.BIZ\\" useKeyTab=true storeKey=true keyTab=\\"/var/lib/tss/keys/fluffy/keytabs/client/discode.keytab\\" doNotPrompt=true; };" >> jaas_config/jaas.conf ''' ) main = JVMProcess( name = SERVICE_NAME, jvm = Java11( heap = HEAP_SIZE, extra_jvm_flags = '-Djava.net.preferIPv4Stack=true' ' -XX:+UseNUMA' ' -XX:+AggressiveOpts' ' -XX:+PerfDisableSharedMem' # http://www.evanjones.ca/jvm-mmap-pause.html ' -Dlog_level={{profile.log_level}}' ' -Dlog.access.output=access.log' ' -Dlog.service.output={{name}}.log' ' -Djava.security.auth.login.config=jaas_config/jaas.conf' ), arguments = '-jar {{name}}-bin.jar' ' -admin.port=:{{thermos.ports[health]}}' ' -kafka.bootstrap.servers={{profile.kafka_bootstrap_servers}}' ' -kafka.bootstrap.servers.remote.dest={{profile.kafka_bootstrap_servers_remote_dest}}' ' -kafka.group.id={{name}}-{{environment}}' ' -kafka.producer.client.id={{name}}-{{environment}}' ' -kafka.max.pending.requests=10000' ' -kafka.consumer.fetch.max=1.megabytes' ' -kafka.producer.batch.size=16.kilobytes' ' -kafka.producer.buffer.mem=128.megabytes' ' -kafka.producer.linger=50.milliseconds' ' -kafka.producer.request.timeout=30.seconds' ' -kafka.producer.compression.type=lz4' ' -kafka.worker.threads=5' ' -kafka.source.topic={{profile.source_topic}}' ' -kafka.sink.topics={{profile.sink_topics}}' ' -decider.base=decider.yml' ' -decider.overlay={{profile.decider_overlay}}' ' -cluster={{cluster}}' ' {{profile.cmdline_flags}}', resources = resources ) stats = Stats( library = 'metrics', port = 'admin' ) job_template = Service( name = SERVICE_NAME, role = 'discode', instances = '{{profile.instances}}', contact = 'disco-data-eng@twitter.com', constraints = {'rack': 'limit:1', 'host': 'limit:1'}, announce = Announcer( primary_port = 'health', portmap = {'aurora': 'health', 'admin': 'health'} ), task = Task( resources = resources, name = SERVICE_NAME, processes = [async_profiler_install, install, setup_jaas_config, main, stats], constraints = order(async_profiler_install, install, setup_jaas_config, main) ), health_check_config = HealthCheckConfig( initial_interval_secs = 100, interval_secs = 60, timeout_secs = 60, max_consecutive_failures = 4 ), update_config = UpdateConfig( batch_size = 100, watch_secs = 90, max_per_shard_failures = 3, max_total_failures = 0, rollback_on_failure = False ) ) PRODUCTION = Profile( # go/uua-decider decider_overlay = '/usr/local/config/overlays/discode-default/UnifiedUserActions/prod/{{cluster}}/decider_overlay.yml' ) STAGING = Profile( package = SERVICE_NAME+'-staging', cmdline_flags = '', kafka_bootstrap_servers_remote_dest = '/srv#/devel/local/kafka/ingestion-1:kafka-tls', decider_overlay = '/usr/local/config/overlays/discode-default/UnifiedUserActions/staging/{{cluster}}/decider_overlay.yml' # go/uua-decider ) DEVEL = STAGING( log_level = 'DEBUG', ) prod_job = job_template( tier = 'preferred', environment = 'prod', ).bind(profile = PRODUCTION) staging_job = job_template( environment = 'staging' ).bind(profile = STAGING) devel_job = job_template( environment = 'devel' ).bind(profile = DEVEL) jobs = [] for cluster in ['atla', 'pdxa']: jobs.append(prod_job(cluster = cluster)) jobs.append(staging_job(cluster = cluster)) jobs.append(devel_job(cluster = cluster))