aws服务从入门到精通| Amazon Kinesis 服务之Firehose操作

kinesis简介(什么是Kinesis)

Amazon Kinesis 可以轻松收集、处理和分析实时视频和数据流

  • 1、使用 Kinesis可以捕获,处理,存储video stream 用来分析和机器学习。
  • 2、使用 Kinesis 构建 自定义应用程序分析数据流,或者使用流行的流处理框架
  • 3、使用 firehose 加载流,处理 流的存储
  • 4,使用 Kinesis Data Analytics 与SQL数据流分析

由于作者英语太烂了,所以附上亚马逊的原文的英文如下:

Amazon Kinesis makes it easy to collect, process, and analyze video and data streams in real time.

  • 1、Use Kinesis Video Streams to capture, process, and store video streams for analytics and machine learning.
  • 2、Use Kinesis Data Streams to build custom applications that analyze data streams using popular stream processing frameworks.
  • 3、Use Kinesis Data Firehose to load data streams into AWS data stores.
  • 4、Use Kinesis Data Analytics to analyze data streams with SQL.

kinesis 操作界面简介:

控制面板

很显然kinesis 只有四大部分组成:

  • 1、Data Streams:
    借助 Amazon Kinesis Data Streams,您可以构建用于处理或分析流数据的自定义应用程序,以满足特定需求。您可以配置数以万计的数据创建器,连续不断地将数据输入 Kinesis 数据流。例如,来自网站点击流、应用程序日志和社交媒体馈送内容的数据。在不到一秒时间里,您的 Amazon Kinesis 应用程序便可以从数据流读取和处理数据。
图片
  • 2、Data Firehose:
    Amazon Kinesis Data Firehose 是将流数据加载到数据存储和分析工具的最简单方式。Kinesis Data Firehose 是一项完全托管的服务,让您可以轻松地从数十万个来源捕获、转换大量流数据并将其加载到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Kinesis Data Analytics 和 Splunk 中,从而实现近乎实时的分析与见解

  • 3、Data Analytics:
    Amazon Kinesis Data Analytics 是使用 ANSI 标准 SQL 实时处理和分析流数据最简单的方法。借助该产品,您能够从 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose 中读取数据,且可以使用 SQL 构建流处理查询或整个应用程序,从而在收到数据后持续进行筛选、转换和整合。Amazon Kinesis Data Analytics 自动识别标准数据格式、解析数据并推荐架构,这样您便可以使用交互式架构编辑器进行编辑。它提供交互式 SQL 编辑器和流处理模板,这样您就可以在几分钟内构建复杂的流处理应用程序。Amazon Kinesis Data Analytics 会在您的流式处理应用程序中持续运行查询,并将处理结果写入输出目标 (如 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose),从而将数据传输至 Amazon S3、Amazon Redshift 和 Amazon Elasticsearch Service。Amazon Kinesis Data Analytics 可以自动预置、部署和扩展运行您的流式处理应用程序所需的资源

  • 4、Video Streams:
    Amazon Kinesis Video Streams 是一项完全托管的视频提取和存储服务。借助它,您可以为支持机器人、智能城市、工业自动化、安全监控、机器学习 (ML) 等功能的应用程序安全地提取、处理和存储任意规模的视频。Kinesis Video Streams 还可以接收其他类型的时间编码数据,例如音频、RADAR 和 LIDAR 信号。Kinesis Video Streams 为您提供了可安装在您设备上的软件开发工具包,从而可以轻松安全地将视频流式传输到 AWS。Kinesis Video Streams 可以自动预置和弹性扩展从数百万台设备中提取视频流所需的所有基础设施。它还持久地存储视频流并对其进行加密和编制索引,而且提供了易于使用的 API,因此应用程序可以基于标签和时间戳来访问和检索已编制索引的视频片段。Kinesis Video Streams 提供了一个库来将 ML 框架 (例如 Apache MxNet、Tensorflow 和 OpenCV) 与视频流集成以构建机器学习应用程序。Kinesis Video Streams 与 Amazon Rekognition Video 集成,从而使您能够构建用于检测和识别流视频中的人脸的计算机视觉应用程序。

以上就是Kinesis 的四个功能所有的服务,读者可以根据实际场景进行选取对应的服务进行操作。我今天主要讲的操作服务是Firehose

Firehose的界面操作部分:

  • 1 、打开Firehose的操作界面。会出现 存在的delivery stream


    image.png

-2、点击 create delivery stream 按钮,出现以下界面:

image.png
  • 3.存储数据之前可以选择是否用lamda函数处理。符合的数据存储。
image.png
  • 4.选择存储的工具。s3、redshift、elasticsearch、splunk
image.png
  • 5、存储文件的格式,缓存多大的时候进行写入,错误日志收集,IAM角色的设置
界面

image.png
  • 6、确认信息界面:


    image.png
  • 7、查看结果


    image.png
image.png

Firehose代码操作:

  • 1.导入pom.xml文件:
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>1.7.5</version>
        </dependency>
  • 2.初始化流操作
 /**
     * 初始化流
     */
    @SuppressWarnings("deprecation")
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }
  • 3、单个流的写入
    /**
     * 
     * 单个写流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }
  • 4、批量流的写入
  /**
     * 批量添加数据到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("写流错误" + e);
        }
    }


    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

  • 4、更新流配置
    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

完整 代码:

package com.sdk.wifi.util.aws.firehose;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.DeliveryStreamDescription;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamRequest;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.S3DestinationUpdate;
import com.amazonaws.services.kinesisfirehose.model.UpdateDestinationRequest;

public class FireHoseUtil {

    private static final Log LOGGER = LogFactory.getLog(FireHoseUtil.class);

    // DeliveryStream properties
    private static AmazonKinesisFirehoseClient firehoseClient;
    private static final String FIRE_HOSE_REGION = "us-west-2";

    /**
     * 批量添加数据到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("写流错误" + e);
        }
    }

    /**
     * 
     * 单个写流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }

    /**
     * 字符串边record
     * 
     * @param data
     * @return
     * @throws UnsupportedEncodingException 
     */
    public static Record createRecord(String data) throws UnsupportedEncodingException {
        return new Record().withData(ByteBuffer.wrap(data.getBytes("UTF-8")));
    }

    /**
     * 初始化流
     */
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }

    /**
     * 批量写流
     * 
     * @param recordList
     * @return
     */
    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容