12. elk+kafka+filebeat

一、elasticsearch


二、kibana


三、kafka

persistent-volume.yaml

kind: PersistentVolume

apiVersion: v1

metadata:

  name: k8s-pv-zk1

  namespace: viomi-kafka

  annotations:

    volume.beta.kubernetes.io/storage-class: "anything"

  labels:

    type: local

spec:

  capacity:

    storage: 3Gi

  accessModes:

    - ReadWriteOnce

  hostPath:

    path: "/data/zookeeper"

  persistentVolumeReclaimPolicy: Recycle

---

kind: PersistentVolume

apiVersion: v1

metadata:

  name: k8s-pv-zk2

  namespace: viomi-kafka

  annotations:

    volume.beta.kubernetes.io/storage-class: "anything"

  labels:

    type: local

spec:

  capacity:

    storage: 3Gi

  accessModes:

    - ReadWriteOnce

  hostPath:

    path: "/data/zookeeper"

  persistentVolumeReclaimPolicy: Recycle

---

kind: PersistentVolume

apiVersion: v1

metadata:

  name: k8s-pv-zk3

  namespace: viomi-kafka

  annotations:

    volume.beta.kubernetes.io/storage-class: "anything"

  labels:

    type: local

spec:

  capacity:

    storage: 3Gi

  accessModes:

    - ReadWriteOnce

  hostPath:

    path: "/data/zookeeper"

  persistentVolumeReclaimPolicy: Recycle

zookeeper.yaml

apiVersion: v1

kind: Service

metadata:

  name: zk-hs

  namespace: viomi-kafka

  labels:

    app: zk

spec:

  ports:

  - port: 2888

    name: server

  - port: 3888

    name: leader-election

  clusterIP: None

  selector:

    app: zk

---

apiVersion: v1

kind: Service

metadata:

  name: zk-cs

  namespace: viomi-kafka

  labels:

    app: zk

spec:

  ports:

  - port: 2181

    name: client

  selector:

    app: zk

---

apiVersion: policy/v1beta1

kind: PodDisruptionBudget

metadata:

  name: zk-pdb

  namespace: viomi-kafka

spec:

  selector:

    matchLabels:

      app: zk

  maxUnavailable: 1

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: zk

  namespace: viomi-kafka

spec:

  selector:

    matchLabels:

      app: zk

  serviceName: zk-hs

  replicas: 3

  updateStrategy:

    type: RollingUpdate

  podManagementPolicy: Parallel

  template:

    metadata:

      labels:

        app: zk

    spec:

      affinity:

        podAntiAffinity:

          requiredDuringSchedulingIgnoredDuringExecution:

            - labelSelector:

                matchExpressions:

                  - key: "app"

                    operator: In

                    values:

                    - zk

              topologyKey: "kubernetes.io/hostname"

      containers:

      - name: kubernetes-zookeeper

        imagePullPolicy: IfNotPresent

        image: "hub.kce.ksyun.com/yunmi-infra/viomi/zookeeper:3.4.10"

        resources:

          requests:

            memory: "100Mi"

            cpu: "0.1"

        ports:

        - containerPort: 2181

          name: client

        - containerPort: 2888

          name: server

        - containerPort: 3888

          name: leader-election

        command:

        - sh

        - -c

        - "start-zookeeper \

          --servers=3 \

          --data_dir=/var/lib/zookeeper/data \

          --data_log_dir=/var/lib/zookeeper/data/log \

          --conf_dir=/opt/zookeeper/conf \

          --client_port=2181 \

          --election_port=3888 \

          --server_port=2888 \

          --tick_time=2000 \

          --init_limit=10 \

          --sync_limit=5 \

          --heap=512M \

          --max_client_cnxns=60 \

          --snap_retain_count=3 \

          --purge_interval=12 \

          --max_session_timeout=40000 \

          --min_session_timeout=4000 \

          --log_level=INFO"

        readinessProbe:

          exec:

            command:

            - sh

            - -c

            - "zookeeper-ready 2181"

          initialDelaySeconds: 10

          timeoutSeconds: 5

        livenessProbe:

          exec:

            command:

            - sh

            - -c

            - "zookeeper-ready 2181"

          initialDelaySeconds: 10

          timeoutSeconds: 5

        volumeMounts:

        - name: datadir

          mountPath: /data/zookeeper

      securityContext:

        runAsUser: 1000

        fsGroup: 1000

  volumeClaimTemplates:

  - metadata:

      name: datadir

      annotations:

        volume.beta.kubernetes.io/storage-class: "anything"

    spec:

      accessModes: [ "ReadWriteOnce" ]

      resources:

        requests:

          storage: 3Gi

apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: kafka

  namespace: viomi-kafka

spec:

  selector:

    matchLabels:

        app: kafka

  serviceName: kafka-svc

  replicas: 3

  template:

    metadata:

      labels:

        app: kafka

    spec:

      nodeSelector:

          travis.io/schedule-only: "kafka"

      tolerations:

      - key: "travis.io/schedule-only"

        operator: "Equal"

        value: "kafka"

        effect: "NoSchedule"

      - key: "travis.io/schedule-only"

        operator: "Equal"

        value: "kafka"

        effect: "NoExecute"

        tolerationSeconds: 3600

      - key: "travis.io/schedule-only"

        operator: "Equal"

        value: "kafka"

        effect: "PreferNoSchedule"

      affinity:

        podAntiAffinity:

          requiredDuringSchedulingIgnoredDuringExecution:

            - labelSelector:

                matchExpressions:

                  - key: "app"

                    operator: In

                    values:

                    - kafka

              topologyKey: "kubernetes.io/hostname"

        podAffinity:

          preferredDuringSchedulingIgnoredDuringExecution:

            - weight: 1

              podAffinityTerm:

                labelSelector:

                    matchExpressions:

                      - key: "app"

                        operator: In

                        values:

                        - zk

                topologyKey: "kubernetes.io/hostname"

      terminationGracePeriodSeconds: 300

      containers:

      - name: k8s-kafka

        imagePullPolicy: Always

        #        image: hub.kce.ksyun.com/yunmi-infra/viomi/kafka:latest

        image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi-kafka:2.11-1.1.1

        resources:

          requests:

            memory: "600Mi"

            cpu: 500m

        ports:

        - containerPort: 9092

          name: server

        command:

        - sh

        - -c

        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \

          --override listeners=PLAINTEXT://:9092 \

          --override zookeeper.connect=zk-0.zk-hs.viomi-kafka.svc.cluster.local:2181,zk-1.zk-hs.viomi-kafka.svc.cluster.local:2181,zk-2.zk-hs.viomi-kafka.svc.cluster.local:2181 \

          --override log.dir=/var/lib/kafka \

          --override auto.create.topics.enable=true \

          --override auto.leader.rebalance.enable=true \

          --override background.threads=10 \

          --override compression.type=producer \

          --override delete.topic.enable=false \

          --override leader.imbalance.check.interval.seconds=300 \

          --override leader.imbalance.per.broker.percentage=10 \

          --override log.flush.interval.messages=9223372036854775807 \

          --override log.flush.offset.checkpoint.interval.ms=60000 \

          --override log.flush.scheduler.interval.ms=9223372036854775807 \

          --override log.retention.bytes=-1 \

          --override log.retention.hours=12 \

          --override log.roll.hours=12 \

          --override log.roll.jitter.hours=0 \

          --override log.segment.bytes=1073741824 \

          --override log.segment.delete.delay.ms=60000 \

          --override message.max.bytes=1000012 \

          --override min.insync.replicas=1 \

          --override num.io.threads=8 \

          --override num.network.threads=3 \

          --override num.recovery.threads.per.data.dir=1 \

          --override num.replica.fetchers=1 \

          --override offset.metadata.max.bytes=4096 \

          --override offsets.commit.required.acks=-1 \

          --override offsets.commit.timeout.ms=5000 \

          --override offsets.load.buffer.size=5242880 \

          --override offsets.retention.check.interval.ms=600000 \

          --override offsets.retention.minutes=1440 \

          --override offsets.topic.compression.codec=0 \

          --override offsets.topic.num.partitions=50 \

          --override offsets.topic.replication.factor=3 \

          --override offsets.topic.segment.bytes=104857600 \

          --override queued.max.requests=500 \

          --override quota.consumer.default=9223372036854775807 \

          --override quota.producer.default=9223372036854775807 \

          --override replica.fetch.min.bytes=1 \

          --override replica.fetch.wait.max.ms=500 \

          --override replica.high.watermark.checkpoint.interval.ms=5000 \

          --override replica.lag.time.max.ms=10000 \

          --override replica.socket.receive.buffer.bytes=65536 \

          --override replica.socket.timeout.ms=30000 \

          --override request.timeout.ms=30000 \

          --override socket.receive.buffer.bytes=102400 \

          --override socket.request.max.bytes=104857600 \

          --override socket.send.buffer.bytes=102400 \

          --override unclean.leader.election.enable=true \

          --override zookeeper.session.timeout.ms=6000 \

          --override zookeeper.set.acl=false \

          --override broker.id.generation.enable=true \

          --override connections.max.idle.ms=600000 \

          --override controlled.shutdown.enable=true \

          --override controlled.shutdown.max.retries=3 \

          --override controlled.shutdown.retry.backoff.ms=5000 \

          --override controller.socket.timeout.ms=30000 \

          --override default.replication.factor=1 \

          --override fetch.purgatory.purge.interval.requests=1000 \

          --override group.max.session.timeout.ms=300000 \

          --override group.min.session.timeout.ms=6000 \

          --override inter.broker.protocol.version=0.11.0.3 \

          --override log.cleaner.backoff.ms=15000 \

          --override log.cleaner.dedupe.buffer.size=134217728 \

          --override log.cleaner.delete.retention.ms=86400000 \

          --override log.cleaner.enable=true \

          --override log.cleaner.io.buffer.load.factor=0.9 \

          --override log.cleaner.io.buffer.size=524288 \

          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \

          --override log.cleaner.min.cleanable.ratio=0.5 \

          --override log.cleaner.min.compaction.lag.ms=0 \

          --override log.cleaner.threads=1 \

          --override log.cleanup.policy=delete \

          --override log.index.interval.bytes=4096 \

          --override log.index.size.max.bytes=10485760 \

          --override log.message.timestamp.difference.max.ms=9223372036854775807 \

          --override log.message.timestamp.type=CreateTime \

          --override log.preallocate=false \

          --override log.retention.check.interval.ms=300000 \

          --override max.connections.per.ip=2147483647 \

          --override num.partitions=4 \

          --override producer.purgatory.purge.interval.requests=1000 \

          --override replica.fetch.backoff.ms=1000 \

          --override replica.fetch.max.bytes=1048576 \

          --override replica.fetch.response.max.bytes=10485760 \

          --override reserved.broker.max.id=1000 "

        env:

        - name: KAFKA_HEAP_OPTS

          value : "-Xmx512M -Xms512M"

        - name: KAFKA_OPTS

          value: "-Dlogging.level=INFO"

        readinessProbe:

          tcpSocket:

            port: 9092

          timeoutSeconds: 1

          initialDelaySeconds: 5

      securityContext:

        runAsUser: 0

        fsGroup: 1000

apiVersion: v1

kind: Service

metadata:

  labels:

    app: zookeeper-cluster-service-1

  name: zookeeper-cluster1

  namespace: viomi-kafka

spec:

  ports:

  - name: client

    port: 2181

    protocol: TCP

  - name: follower

    port: 2888

    protocol: TCP

  - name: leader

    port: 3888

    protocol: TCP

  selector:

    app: zookeeper-cluster-service-1

---

apiVersion: v1

kind: Service

metadata:

  labels:

    app: zookeeper-cluster-service-2

  name: zookeeper-cluster2

  namespace: viomi-kafka

spec:

  ports:

  - name: client

    port: 2181

    protocol: TCP

  - name: follower

    port: 2888

    protocol: TCP

  - name: leader

    port: 3888

    protocol: TCP

  selector:

    app: zookeeper-cluster-service-2

---

apiVersion: v1

kind: Service

metadata:

  labels:

    app: zookeeper-cluster-service-3

  name: zookeeper-cluster3

  namespace: viomi-kafka

spec:

  ports:

  - name: client

    port: 2181

    protocol: TCP

  - name: follower

    port: 2888

    protocol: TCP

  - name: leader

    port: 3888

    protocol: TCP

  selector:

    app: zookeeper-cluster-service-3

kafka_manager.yaml

apiVersion: apps/v1

kind: Deployment

metadata:

  name: kafka-manager

  namespace: viomi-kafka

spec:

  replicas: 1

  selector:

    matchLabels:

      app: kafka-manager

  template:

    metadata:

      labels:

        app: kafka-manager

    spec:

      containers:

        - image: zenko/kafka-manager

          #basicAuth:

          #  enabled: false

          name: kafka-manager

          ports:

          - name: kafka-manager

            containerPort: 9000

            protocol: TCP

          env:

          - name: ZK_HOSTS

            value: "zoo1:2181,zoo2:2181,zoo3:2181"

apiVersion: v1

kind: Service

metadata:

        #  annotations:

        #service.beta.kubernetes.io/ksc-loadbalancer-id: 63405a34-2875-4b4b-b169-ae37b285100e

  labels:

    app: kafka-manager

  name: kafka-manager-server

  namespace: viomi-kafka

spec:

  ports:

  - name: "9000"

  #  nodePort: 32662

    port: 9000

    protocol: TCP

    targetPort: 9000

  selector:

    app: kafka-manager

  type: ClusterIP

apiVersion: extensions/v1beta1

kind: Ingress

metadata:

  annotations:

    kubernetes.io/ingress.class: traefik

  name: kafka-manager-ingress

  namespace: viomi-kafka

spec:

  rules:

  - host: kafka-manager.viomi.com.cn

    http:

      paths:

      - backend:

          serviceName: kafka-manager-server

          servicePort: 9000

四、filebeat

apiVersion: v1

kind: ConfigMap

metadata:

  name: filebeat-config

  namespace: kube-system

  labels:

    k8s-app: filebeat

data:

  filebeat.yml: |-

    processors:

    - add_cloud_metadata: ~

    - add_docker_metadata: ~

    logging.level: error

    filebeat.inputs:

    - type: container

      scan_frequency: 1s

      backoff_factor: 2

      backoff: 1s

      tail_files: true

      max_backoff: 30s

      spool_size: 2048

      paths:

        - /data/docker/containers/*/*-json.log

        - /data/docker/containers/*/*-json.log-*

      include_lines: ['INFO','ERROR','WARN','DEBUG']

      #multiline.pattern: '^\[[0-9]{4}-[0-9]{2}|^\['

      multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}|^\[0-9]{4}\/|^[[:space:]]+|^Caused by:'

      multiline.negate: false

      multiline.match: after

      multiline.max_lines: 150

      processors:

        - add_kubernetes_metadata:

            host: ${NODE_NAME}

            matchers:

            - logs_path:

                logs_path: "/data/docker/containers/"

    output.kafka:

      version: "1.1.1"

      enabled: true

      hosts: ["10.62.1.135:9092"]

      topic: "elk_kafka"

      topics:

        - topic: "yunmi-infra"

          when.contains:

            kubernetes.namespace: "yunmi-infra"

        - topic: "yunmi-trade"

          when.contains:

            kubernetes.namespace: "yunmi-trade"

        - topic: "yunmi-business"

          when.contains:

            kubernetes.namespace: "yunmi-business"

        - topic: "yunmi-front"

          when.contains:

            kubernetes.namespace: "yunmi-front"

        - topic: "yunmi-vwater"

          when.contains:

            kubernetes.namespace: "yunmi-vwater"

        - topic: "yunmi-bigdata"

          when.contains:

            kubernetes.namespace: "yunmi-bigdata"

        - topic: "kube-system"

          when.contains:

            kubernetes.namespace: "kube-system"

      partition.round_robin:

        reachable_only: true

      required_acks: 0

      compression: gzip

      compression_level: 1

      max_message_bytes: 10000000

apiVersion: apps/v1

kind: DaemonSet

metadata:

  labels:

    app: filebeat

    id: filebeat

  name: filebeat

  namespace: kube-system

spec:

  revisionHistoryLimit: 10

  selector:

    matchLabels:

      app: filebeat

      id: filebeat

  template:

    metadata:

      annotations:

        cattle.io/timestamp: "2019-11-28T07:38:18Z"

      creationTimestamp: null

      labels:

        app: filebeat

        id: filebeat

      name: filebeat

    spec:

      hostAliases:

      - ip: "10.62.1.135"

        hostnames:

        - "kafka-0.kafka-svc.viomi-kafka.svc.cluster.local"

        - "kafka-1.kafka-svc.viomi-kafka.svc.cluster.local"

        - "kafka-2.kafka-svc.viomi-kafka.svc.cluster.local"

        - "kafka-2.kafka-svc.viomi-kafka.svc.cluster.local"

      serviceAccountName: filebeat

      containers:

      - image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi-filebeat:latest

        imagePullPolicy: Always

        name: filebeat

        args: [

          "-c", "/etc/filebeat.yml",

          "-e",

        ]

        env:

        - name: NODE_NAME

          valueFrom:

            fieldRef:

              fieldPath: spec.nodeName

        resources:

          limits:

            cpu: "3"

            memory: 3000Mi

          requests:

            cpu: 300m

            memory: 300Mi

        securityContext:

          privileged: true

          procMount: Default

          runAsUser: 0

        terminationMessagePath: /dev/termination-log

        terminationMessagePolicy: File

        volumeMounts:

        - mountPath: /data/docker/containers

          name: containers

        - name: config

          mountPath: /etc/filebeat.yml

          readOnly: true

          subPath: filebeat.yml

      dnsPolicy: ClusterFirst

      restartPolicy: Always

      schedulerName: default-scheduler

      securityContext: {}

      terminationGracePeriodSeconds: 30

      volumes:

      - hostPath:

          path: /data/docker/containers

          type: ""

        name: containers

      - name: config

        configMap:

          defaultMode: 0600

          name: filebeat-config

  updateStrategy:

    type: OnDelete

五、logstash

logstash.yaml        

apiVersion: apps/v1

kind: StatefulSet

metadata:

  labels:

    app: viomi-logstash

  name: viomi-logstash

  namespace: kube-system

spec:

  podManagementPolicy: OrderedReady

  replicas: 9

  selector:

    matchLabels:

      app: viomi-logstash

      id: viomi-logstash

  serviceName: elasticsearch-logging

  template:

    metadata:

      creationTimestamp: null

      labels:

        app: viomi-logstash

        id: viomi-logstash

    spec:

      containers:

      - args:

        - /usr/share/logstash/bin/logstash -f /etc/logstash/conf/logstash.conf

        command:

        - /bin/sh

        - -c

        image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi_logstash_latest:5.6.4

        imagePullPolicy: IfNotPresent

        name: viomi-logstash

        resources:

          limits:

            memory: 2000Mi

          requests:

            memory: 512Mi

        volumeMounts:

        - mountPath: /etc/logstash/conf/

          name: conf

      dnsPolicy: ClusterFirst

      initContainers:

      - command:

        - bash

        - -c

        - |

          hostname=`echo $HOSTNAME | awk -F '-' '{print $NF}'`

          if [[ $hostname -eq 0 ]]; then

            cp /mnt/conf.d/logstash_infra.conf /etc/logstash/conf/logstash.conf && cat /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 1 ]]; then

            cp /mnt/conf.d/logstash_trade.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 2 ]]; then

            cp /mnt/conf.d/logstash_business.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 3 ]]; then

            cp /mnt/conf.d/logstash_other.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 4 ]]; then

            cp /mnt/conf.d/logstash_test.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 5 ]]; then

            cp /mnt/conf.d/logstash_infra.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 6 ]]; then

            cp /mnt/conf.d/logstash_trade.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 7 ]]; then

            cp /mnt/conf.d/logstash_business.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 8 ]]; then

            cp /mnt/conf.d/logstash_test.conf /etc/logstash/conf/logstash.conf

          fi

        image: hub.kce.ksyun.com/yunmi-infra/viomi/rocketmq:broker-4.5.0

        imagePullPolicy: IfNotPresent

        name: init-broker

        resources:

          limits:

            memory: 1000Mi

        terminationMessagePath: /dev/termination-log

        terminationMessagePolicy: File

        volumeMounts:

        - mountPath: /mnt/conf.d/

          name: viomi-logstash-config

        - mountPath: /etc/logstash/conf/

          name: conf

      restartPolicy: Always

      schedulerName: default-scheduler

      securityContext:

        fsGroup: 1000

        runAsUser: 0

      terminationGracePeriodSeconds: 30

      volumes:

      - configMap:

          defaultMode: 420

          name: viomi-logstash-config

        name: viomi-logstash-config

      - emptyDir: {}

        name: conf

logstash_conf.yaml


apiVersion: v1

kind: ConfigMap

metadata:

  name: viomi-logstash-config

  namespace: kube-system

  labels:

    k8s-app: logstash-config

data:

  logstash_infra.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-infra"

        topics => ["yunmi-infra"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_business.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-business"

        topics => ["yunmi-business"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_trade.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-trade"

        topics => ["yunmi-trade"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_other.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-other"

        topics => ["prod_elk_kafka","kafka_topic","kube-system","elk_kafka","yunmi-front","yunmi-vwater","yunmi-bigdata"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_test.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true 

        group_id  => "logstash-test-grp"

        topics => ["test_elk_kafka"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

      }

    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容