1.基于NGINX和thrif rpc的日志采集
1.1 软件包准备
前提环境:Hadoop+Flume+Hbase+zookeeper
安装环境:
链接:https://pan.baidu.com/s/1zikEFn_bLkgj1DwTMM7cxg
提取码:8wde
1.2 thrif rpc安装
任务1:调通单机版的thrift、python版本
step1: 安装thrift(下载、编译)
安装thrift rpc环境:好处
1)安全,数据加密(二进制)
2)数据压缩,节省带宽,提升性能
3)解耦,常用于内部服务之间的互通
解压
]# tar xvzf thrift-0.9.3.tar.gz
完成解压,进入thrift源码的根目录下
step2: 安装依赖库(yum库),以便后面能进行源码编译
]# yum install boost-devel-static libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev ant
step3: 接着安装c、c++源码包通常3步:
1)./configure --with-cpp --with-boost --with-python --without-csharp --with-java --without-erlang --without-perl --without-php --without-php_extension --without-ruby --without-haskell --without-go
执行过程如下图,发现有错误,从日志看出原因是缺少c++编译器
安装c++编译器:yum install gcc-c++
继续./configure,还是报错:
解决办法:
安装 openssl openssl-devel (centOS)
]#yum -y install openssl openssl-devel
再次./configure,成功。
2)make
3)make install
此刻完成thrift的安装。
1.3 基于python的thrift的使用
step1: 保证python可支持的模块
]# pip install thrift==0.9.3
这时改用清华镜像的方式安装,安装成功。
[root@master src]# pip install thrift==0.9.3 -i https://pypi.tuna.tsinghua.edu.cn/simple/
step2: 创建接口文件
]# cat RecSys.thrift
service RecSys {
string rec_data(1:string data)
}
step3: 利用接口文件,自动生成py接口代码(python:接口模块,c++:server)
]# thrift --gen py RecSys.thrift
step4: 开发python的server端和client端代码
server.py
#coding=utf=8
import sys
sys.path.append('../schema/gen-py')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from RecSys import RecSys
from RecSys.ttypes import *
class RecSysHandler(RecSys.Iface):
def rec_data(self, a):
print "Receive: %s" %(a)
return "I'm OK !!!"
if __name__ == "__main__":
# 实例化handler
handler = RecSysHandler()
# 设置processor
processor = RecSys.Processor(handler)
# 设置端口
transport = TSocket.TServerSocket('localhost', port=9090)
# 设置传输层
tfactory = TTransport.TBufferedTransportFactory()
# 设置传输协议
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()
print 'done.'
client.py
#coding=utf=8
import sys
sys.path.append('../schema/gen-py')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from RecSys import RecSys
try:
# 设置端口
transport = TSocket.TSocket('localhost', port=9090)
# 设置传输层
transport = TTransport.TBufferedTransport(transport)
# 设置传输协议
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = RecSys.Client(protocol)
transport.open()
rst = client.rec_data("are you ok!!!")
print "receive return data: ", rst
transport.close()
except Thrift.TException, ex:
print "%s" % (ex.message)
第29行就是在本地启动一个9090端口的服务
1)先启动server,等待数据
]# python server.py
报错:
经查明,子模块和当前python文件都引用了同一个模块,于是去掉python文件的引用:
再次启动:
2)发送数据client
]# python client.py
实用python3环境执行报错,切换成python2.7再次执行
客户端打印:
服务端打印:
到此,完成了基于python的thrift的使用!
1.4 调通单机版的thrift、c++版本
step1: 产生c++的接口代码:
]# thrift --gen cpp RecSys.thrift
step2:进入目录gen-cpp/,直接编译
]# g++ -I/usr/local/include/thrift/ -lthrift RecSys_server.skeleton.cpp RecSys.cpp RecSys_constants.cpp -o server
生成一个server,file server可以看到server是一个可执行文件。
从RecSys_server.skeleton.cpp文件中查看端口号:vim RecSys_server.skeleton.cpp
可看到默认端口号是9090
step3: 开启服务:./server,发现报错
解决办法:
修改/.bashrc或/.bash_profile或系统级别的/etc/profile
- 添加:
export LD_LIBRARY_PATH=/usr/local/lib
export PATH=/where/lib/:$LD_LIBRARY_PATH:$PATH -
source .bashrc (使生效)
再次执行./server,没有报错,查看端口号:nc -tunlp|grep 9090
step4: 实用python client端请求测试
python client
发现客户端返回是空,因为我们server端没有做任何修改
修改前的RecSys_server.skeleton.cpp文件内容:
接下来做一下修改:
]# vim RecSys_server.skeleton.cpp
23 void rec_data(std::string& _return, const std::string& data) {
24 // Your implementation goes here
25 //printf("rec_data\n");
26 std::cout << "Receive Data: " << data << std::endl;
27
28 _return = "I'm OK !!!";
29 }
修改完毕,需要重新编译:
]# g++ -I/usr/local/include/thrift/ -lthrift RecSys_server.skeleton.cpp RecSys.cpp RecSys_constants.cpp -o server
再次运行./server和python client.py
server端打印如下:
client端响应打印如下:
至此,c++版本的server开发完毕,剩下的就是c++版本的client的开发了。
step4: 接下来,开发client端:
client_demo.cpp
#include "RecSys.h"
#include <iostream>
#include <string>
#include <transport/TSocket.h>
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace std;
int main(int argc, char **argv) {
boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
transport->open();
RecSysClient client(protocol);
string query_str = "are you okokok!!!";
string recevie_data;
client.rec_data(recevie_data, query_str);
std::cout << "Receive data:" << recevie_data << endl;
transport->close();
}
编译:
]# g++ -I/usr/local/include/thrift/ -lthrift client.cpp RecSys.cpp RecSys_constants.cpp -o client
原来的server没有停止,现在执行client_demo.cpp:./client_demo.cpp
服务端打印:
客户端打印:
我们再来测试一下c++的client端和python的server间的通信
先启动python的server端:python server.py
再启动c++的client端:./client_demo
服务端打印:
客户端打印:
至此,c++的客户端和服务端的thrift开发完毕。
1.5 搭建Nginx服务器
step1: 安装通常3步:
1)./configure --prefix=/usr/local/nginx/
2)make
3)make install
step2: 启动nginx:
在/usr/local/nginx目录下
]# ./sbin/nginx
]# netstat -antup | grep 80
访问:
http://192.168.87.10/
http://master/
1.6 配合cgi完成独立的server
CGI:公共网关协议:在web服务器上开发一个cgi的程序,该程序可以访问计算机上的资源
step1: 下载:fcgi-2.4.1-SNAP-0910052249.tar.gz在本文最上面已经给出
step2: 解压后,修改一个头文件:]# vim include/fcgio.h
第35行
step3: 安装
安装通常3步:
1)./configure
2)make
3)make install
step4: 开始开发一个cgi的demo
test.cpp代码编写如下:
#include <iostream>
#include <string>
#include <stdio.h>
#include <stdlib.h>
#include <fcgi_stdio.h>
#include <fcgiapp.h>
using namespace std;
inline void send_response(
FCGX_Request& request, const string& resp_str) {
FCGX_FPrintF(request.out, "Content-type: text/html;charset=utf-8\r\n\r\n");
FCGX_FPrintF(request.out, "%s", resp_str.c_str());
FCGX_Finish_r(&request);
}
int main(int argc, char **argv) {
FCGX_Init();
//接收到的请求
FCGX_Request request;
FCGX_InitRequest(&request, 0, 0);
while(FCGX_Accept_r(&request) >= 0) {
string query_str = FCGX_GetParam("QUERY_STRING", request.envp);
cout << "query str: " << query_str << endl;
send_response(request, query_str);
}
return 0;
}
编译:]# g++ test.cpp -lfcgi -o test
单独执行这个test没什么用,它只是一小段代码,为了完成整个cgi demo开发,接下来需要安装spawn-cgi并完成服务托管
step4: 下载安装一个托管服务,在本文前面也已经给出,自行下载路径:wget https://github.com/lighttpd/spawn-fcgi/archive/spawn-fcgi-1.6.4.tar.gz
step5: 安装分4步:
1)./autogen.sh
2)./configure
3)make
4)make install
step6: 接下来,用spawn-fcgi工具托管自主开发的cgi demo bin(test)
]# /usr/local/bin/spawn-fcgi -a 127.0.0.1 -p 8099 -f test
发现在当前目录执行报错,切换到其他目录再次执行,成功
检查端口号:]# netstat -antup|grep 8099
接下来怎么把这个NGINX80端口转发到8099,这时就需要改造NGINX
1.7 NGINX配置
step1:
配置nginx的反向代理功能:
/usr/local/nginx/conf目录下
nginx.conf文件中加入:
48 location ~ /behavior_recsys$ {
49 fastcgi_pass 127.0.0.1:8099;
50 include fastcgi_params;
51 }
step2:
配置完后,重新加载配置:
]# ./sbin/nginx -s reload
step3:
测试,打开浏览器,访问:http://master/behavior_recsys?userid=111&item=222&action=click
查看nginx的log,也是正常。
日志之所以能够直接返回到页面展示,是因为我们之前开发的cgi的demo,test.cpp代码,直接将接受到的请求返回。
1.8 使用google glog日志模块把用户行为信息写入到本地文件(log文件)
glog日志级别 FATAL(高) > ERROR > WARNING > INFO > TRACE > DEBUG(低)
step1: 安装glog:
1)./configure
2)make
3)make install
step2: 开发client,相当于在之前的基础上,把日志写入文件。
#include <stdio.h>
#include <stdlib.h>
#include <fcgi_stdio.h>
#include <fcgiapp.h>
#include <glog/logging.h>
using namespace std;
inline void send_response(
FCGX_Request& request, const string& resp_str) {
FCGX_FPrintF(request.out, "Content-type: text/html;charset=utf-8\r\n\r\n");
FCGX_FPrintF(request.out, "%s", resp_str.c_str());
FCGX_Finish_r(&request);
}
int main(int argc, char **argv) {
FCGX_Init();
FCGX_Request request;
FCGX_InitRequest(&request, 0, 0);
FLAGS_log_dir = "/root/7_codes/logserver_test/cgi_demo/logs";
FLAGS_max_log_size = 100;
FLAGS_logbufsecs = 0;
google::InitGoogleLogging(argv[0]);
while(FCGX_Accept_r(&request) >= 0) {
string query_str = FCGX_GetParam("QUERY_STRING", request.envp);
cout << "query str: " << query_str << endl;
LOG(INFO) << query_str;
LOG(WARNING) << query_str;
LOG(ERROR) << query_str;
send_response(request, query_str);
}
return 0;
}
step3: 在当前目录新建目录logs:mkdir logs
step4: 编译:]# g++ -lglog -lfcgi client.cpp -o client (这里引入了glog)
step5: 关掉之前的cgi进程
step6: 加入托管服务:]# /usr/local/bin/spawn-fcgi -a 127.0.0.1 -p 8099 -f /usr/local/src/logserver_base/cgi_demo/client
step7: 验证:在浏览器访问:http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click
此时可以观察日志输出:
1.9 将thrift和cgi联合打通,完整日志服务
thrif和cgi的区别:
rpc好处
1.安全,数据加密(二进制)
2.数据压缩,因为是二进制方式,比字符串方式规模要小很多,所以节省带宽,提升性能
3.常用于内部服务之间的互通(解耦),而http更倾向于web
thrift——内部服务,解耦,安全(数据加密)
cgi——web服务
问题:nginx本身就是日志服务,为什么要引入cgi?
如果只是定位一个日志服务器的话,可以不需要,但如果要开发推荐引擎或搜索引擎的话,就必须这样的架构来做。
step1: 启动server
server代码:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "RecSys.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
class RecSysHandler : virtual public RecSysIf {
public:
RecSysHandler() {
// Your initialization goes here
}
void rec_data(std::string& _return, const std::string& data) {
// Your implementation goes here
// printf("rec_data\n");
std::cout << "Receive Data: " << data << std::endl;
_return = "I'm OK !!!";
}
};
int main(int argc, char **argv) {
int port = 9090;
shared_ptr<RecSysHandler> handler(new RecSysHandler());
shared_ptr<TProcessor> processor(new RecSysProcessor(handler));
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
step2: 开发client.cpp
#include "RecSys.h"
#include <iostream>
#include <string>
#include <stdio.h>
#include <stdlib.h>
#include <fcgi_stdio.h>
#include <fcgiapp.h>
#include <transport/TSocket.h>
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>
#include <glog/logging.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace std;
inline void send_response(
FCGX_Request& request, const string& resp_str) {
FCGX_FPrintF(request.out, "Content-type: text/html;charset=utf-8\r\n\r\n");
FCGX_FPrintF(request.out, "%s", resp_str.c_str());
FCGX_Finish_r(&request);
}
int main(int argc, char **argv) {
// step 1. init fcgi
FCGX_Init();
FCGX_Request request;
FCGX_InitRequest(&request, 0, 0);
// step 2. init glog
FLAGS_log_dir = "/usr/local/src/logserver_base/cgi_demo/logs";
FLAGS_max_log_size = 100;
FLAGS_logbufsecs = 0;
google::InitGoogleLogging(argv[0]);
// step 3. init thrift
boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
transport->open();
RecSysClient client(protocol);
while(FCGX_Accept_r(&request) >= 0) {
string query_str = FCGX_GetParam("QUERY_STRING", request.envp);
cout << "query str: " << query_str << endl;
LOG(ERROR) << query_str;
// request to thrift server
string recevie_data;
client.rec_data(recevie_data, query_str);
// return info back http
send_response(request, recevie_data);
}
return 0;
}
step3: 编译
g++ -I/usr/local/include/thrift -lthrift -lglog -lfcgi RecSys.cpp RecSys_constants.cpp client.cpp -o client
step4:托管
step5:测试
先删除logs下的日志,浏览器访问:http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click
日志成功写入:
网页上和客户端也成功返回:
2.0压力测试
step1: 安装压测包:]# yum install httpd-tools
step2:压测命令:ab -c 20 -n 5000 http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click
http://192.168.87.10:9088/
一次性发5000个请求,请求情况:
可以看到,平均每次请求时间为0.232毫秒
日志行数正确:
step3: 接下来模拟真实请求,积累多样性的日志:
]# cat request.py
import os
import sys
import random
base_url='http://[http://192.168.36.101](http://192.168.36.101/)
/behavior_recsys?'
action_type_list = ['show', 'click', 'collect', 'pay']
ip_addr_list = ['10.1.1.3', '10.5.2.76', '10.1.2.90', '10.0.2.188', '10.10.1.19']
url_list = []
for i in range(1000):
userid = "userid=" + str(int(random.random() * 50))
itemid = "itemid=" + str(random.randint(30001, 30050))
action_type = "type=" + random.sample(action_type_list, 1)[0]
ip = "ip=" + random.sample(ip_addr_list, 1)[0]
url = "\"" + base_url +'&'.join([userid, itemid, action_type, ip]) + "\""
url_list.append(url)
for i in url_list:
#os.system('ab -c 20 -n5000 ' + url)
os.system('curl ' + i)
step4: 和python的web服务性能比较
python服务代码:
import web
urls = (
'/(.*)', 'hello'
)
app = web.application(urls, globals())
class hello:
def GET(self, name):
return 'ok!!!'
if __name__ == "__main__":
app.run()
启动:]# python pyserver.py 9999
压测:
ab -c 20 -n 5000 http://192.168.36.101:9999/
压测结果:
我们发现,python服务请求,业务很简单但是平均每次请求时间为2.19毫秒。而c++虽然做了很多层业务调用,但是比起简单业务的python性能上还是高很多,要高出2.19/0.232,约为10倍!
2. 实时对接用户行为log,Flume进行实时流的打通
2.1FlumeHbaseEventSerializer开发
利用IDE,完成FlumeHbaseEventSerializer的开发,编译jar包
将生成的/root/IdeaProjects/FlumeTest/target/FlumeTest-1.0-SNAPSHOT.jar文件
拷贝到/usr/local/src/apache-flume-1.6.0-bin/lib目录下
step1: pom配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FlumeHbase</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
step2. FlumeHbaseEventSerializer实现:
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
public class FlumeHbaseEventSerializer implements HbaseEventSerializer {
public static final String REGEX_CONFIG = "regex";
public static final String REGEX_DEFAULT = " ";
public static final String INGNORE_CASE_CONFIG = "regexIgnoreCase";
public static final boolean INGNORE_CASE_DEFAULT = false;
public static final String COL_NAME_CONFIG = "colNames";
public static final String COL_NAME_DEFAULT = "ip";
public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex";
public static final String ROW_KEY_NAME = "ROW_KEY";
public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
public static final String CHARSET_CONFIG = "charset";
public static final String CHARSET_DEFAULT = "utf-8";
protected static final AtomicInteger nonce = new AtomicInteger(0);
protected static String randomKey = RandomStringUtils.randomAlphanumeric(10);
protected byte[]cf;
private byte[]payload;
private List<byte[]> colNames = Lists.newArrayList();
private boolean regexIngnoreCase;
private Charset charset;
public void initialize(Event event, byte[] columnFamily) {
event.getHeaders();
this.payload = event.getBody();
this.cf = columnFamily;
}
public List<Row> getActions() {
ArrayList<Row> actions = Lists.newArrayList();
byte[] rowKey;
String body = new String(payload,charset);
String tmp = body.replace("\"","");
String[] arr = tmp.trim().split(" ");
int len = arr.length;
System.out.println("tmp:"+tmp);
System.out.println("len1:"+len);
String log_data = arr[len-1];
String[] param_arr = log_data.split("&");
System.out.println("===================="+log_data);
String userid = param_arr[0].split("=")[1];
String itemid = param_arr[1].split("=")[1];
String type = param_arr[2].split("=")[1];
String ip_str = param_arr[3].split("=")[1];
System.out.println("=====================");
System.out.println("=====================");
System.out.println("=====================");
System.out.println("=====================");
System.out.println(userid);
System.out.println(itemid);
System.out.println(type);
System.out.println(ip_str);
System.out.println("=====================");
System.out.println("=====================");
System.out.println("=====================");
System.out.println("=====================");
try {
System.out.println("0000=============");
rowKey = getRowKey();
System.out.println("1111=============");
Put put = new Put(rowKey);
put.add(cf,colNames.get(0),userid.getBytes(Charsets.UTF_8));
put.add(cf,colNames.get(1),itemid.getBytes(Charsets.UTF_8));
put.add(cf,colNames.get(2),type.getBytes(Charsets.UTF_8));
put.add(cf,colNames.get(3),ip_str.getBytes(Charsets.UTF_8));
actions.add(put);
} catch (Exception e){
throw new FlumeException("could not get row key!",e);
}
return actions;
}
public List<Increment> getIncrements() {
return Lists.newArrayList();
}
public void close() {
}
public void configure(Context context) {
String regex = context.getString(REGEX_CONFIG,REGEX_DEFAULT);
regexIngnoreCase = context.getBoolean(INGNORE_CASE_CONFIG,INGNORE_CASE_DEFAULT);
context.getBoolean(DEPOSIT_HEADERS_CONFIG,DEPOSIT_HEADERS_DEFAULT);
Pattern.compile(regex, Pattern.DOTALL+(regexIngnoreCase? Pattern.CASE_INSENSITIVE: Pattern.UNIX_LINES));
charset = Charset.forName(context.getString(CHARSET_CONFIG,CHARSET_DEFAULT));
String cols = new String(context.getString("columns"));
String colNameStr = "";
if(cols != null && !"".equals(cols)){
colNameStr = cols;
}else {
colNameStr = context.getString(COL_NAME_CONFIG,COL_NAME_DEFAULT);
}
String[]colmnNames = colNameStr.split(",");
for (String s:colmnNames){
colNames.add(s.getBytes(charset));
}
}
public void configure(ComponentConfiguration componentConfiguration) {
}
protected byte[] getRowKey(Calendar cal){
String str = new String(payload,charset);
String tmp = str.replace("\"","");
String[]arr = tmp.split(" ");
int len = arr.length;
System.out.println("tmp:"+tmp);
System.out.println("len2:"+len);
String log_data = arr[len-1];
String[] param_arr = log_data.split("&");
String userid = param_arr[0];
String itemid = param_arr[1];
String type = param_arr[2];
String ip_str = param_arr[3];
String rowKey = ip_str + "-"+ nonce.getAndIncrement();
return rowKey.getBytes(charset);
}
protected byte[] getRowKey(){
return getRowKey(Calendar.getInstance());
}
}
step3: 打jar包
step4:将jar包拷贝到flume的lib目录中
2.2 Flume服务端和客户端配置
step1 :服务端配置
/usr/local/src/apache-flume-1.6.0-bin/conf/logserver_flume_hbase目录下,指定一下sink方式:
flume server端配置文件:flume-server.properties:
# 定义这个agent中各组件的名字
a1.channels = c1
a1.sources = r1
a1.sinks = k1
# set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = master
a1.sources.r1.port = 52020
a1.sinks.k1.type=hbase
a1.sinks.k1.table=user_action_table
a1.sinks.k1.columnFamily=action_log
a1.sinks.k1.serializer=com.badou.FlumeHbaseEventSerializer
a1.sinks.k1.serializer.columns=userid,itemid,type,ip
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
step2 :Flume客户端:(slave1和master的flume作为客户端)
flume client端配置文件:flume-client.properties:
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source组件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/logserver_study/cgi_demo/logs/client.ERROR
# 描述和配置sink组件:k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 52020
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.3 Flume和Hbase打通之前,我们需要准备Hadoop和Hbase
step1: 启动Hbase依赖的zookeeper
step1: 启动Hadoop:start-all.sh
step2:启动Hbase:start-hbase.sh
step3:进程检查:
step4: hbase创建日志表
打开hbase客户端并创建表:
2.4 启动Flume,打通Hbase
step1: master节点启动服务端
]# ./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-server.properties --name a1 -Dflume.root.logger=INFO,console
step2: master节点启动客户端
]# ./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-client.properties --name a1 -Dflume.root.logger=INFO,console
step3: slave1节点启动客户端
]# ./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-client.properties --name a1 -Dflume.root.logger=INFO,console
2.5 日志采集测试
在浏览器模拟访问:http://192.168.36.101/behavior_recsys?userid=111&item=222&action=click&ip=1.0.0.10
-
观察flume服务端控制台:
发现日志都打印出来了,说明日志能正常采集到,但是报了错:
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
这个错误说明我的hbase版本和flume版本不兼容,我的flume版本是1.6.0,hbase是0.98.6。
将Flume版本切换为1.7后,启动Flume Server和Flume Client后,查看server端控制台,发现没有报错了。
-
观察hbase,查看user_action_table表数据是否被插入:
我们看到,hbase的user_action_table表已经有数据了,说明我们的数据采集到数据存储已经成功打通!
2.6 完成Nginx的负载均衡(支撑高并发)
step1: 配置计划:
192.168.87.10 master ——> server1
192.168.87.11 slave1 ——> server2
192.168.87.12 slave2 ——> proxy
step2: nginx配置
- master的配置不变:
location ~ /behavior_recsys$ {
fastcgi_pass 127.0.0.1:8099;
include fastcgi_params;
}
- slave1的配置
1.和master一致,前提是和master一样地能提供rpc服务,启动rpc client端:
]# g++ -I/usr/local/include/thrift -lthrift -lglog -lfcgi RecSys.cpp RecSys_constants.cpp client.cpp -o client
/usr/local/bin/spawn-fcgi -a 127.0.0.1 -p 8099 -f /usr/local/src/logserver_base/schema/gen-cpp/client
2.模拟nginx,直接调用master的rpc服务,配置如下:
location ~ /behavior_recsys$ {
fastcgi_pass 192.168.36.101:8099;
include fastcgi_params;
}
- slave2的代理分发配置
18 upstream recsys {
19 server 192.168.36.101:80;
20 server 192.168.36.102:80;
21 }
48 location ~ /behavior_recsys$ {
49 proxy_pass http://recsys;
50 }
step3 :进入nginx主目录重新加载:./sbin/nginx -s reload
2.7flume并发
step1:将master主机下的flume jar包,flume conf下的flume-server.properties,flume-client.properties文件拷贝到slave1.
step2:进入flume conf目录启动slave1上的client端:
./bin/flume-ng agent --conf conf --conf-file ./conf/logserver_flume_hbase/flume-client.properties --name a1 -Dflume.root.logger=INFO,console
2.8: 测试:直接访问slave2的服务,看它能否转发到master或slave1来进行存储数据,访问:
http://192.168.36.103/behavior_recsys?userid=123&item=111&action=click&ip=1.0.0.10
如下图,我们发现数据插入成功:
至此,我们完成了从NGINX到cglib到server再到flume,最终到hbase的支持高并发的数据采集流程完成。