源码
1. 创建一个mosquitto_auth.proto
syntax = "proto3";
option go_package = "./mosquitto_auth";
package mosquitto_auth;
service Greeter {
rpc BasicAuth (BasicAuthRequest) returns (BasicAuthReply) {}
rpc AclCheck (AclCheckRequest) returns (AclCheckReply) {}
}
message BasicAuthRequest {
string username = 1;
string password = 2;
string clientId = 3;
string clientAddress = 4;
}
message BasicAuthReply {
int32 code = 1;
}
message AclCheckRequest {
string username = 1;
string clientId = 2;
string topic = 3;
int32 access = 4;
int32 qos = 5;
int32 retain = 6;
}
message AclCheckReply {
int32 code = 1;
}
- 主要是两个方法
BasicAuth
和AclCheck
2.使用golang创建GRPC客户端,然后将其编译成C语言动态库
package main
/**/
import "C"
import (
"context"
"log"
"mosquitto_auth_plugin/mosq_err"
pb "mosquitto_auth_plugin/mosquitto_auth"
"time"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var clientConn *grpc.ClientConn = nil
//export PluginInit
func PluginInit(addr *C.char) C.int {
if clientConn.GetState() != connectivity.Shutdown {
clientConn.Close()
}
clientConn, err := grpc.Dial(C.GoString(addr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil || clientConn == nil {
log.Fatalf("[PluginInit] did not connect: %v", err)
return mosq_err.MOSQ_ERR_UNKNOWN
}
return mosq_err.MOSQ_ERR_SUCCESS
}
//export PluginBasicAuth
func PluginBasicAuth(username *C.char, password *C.char, clientId *C.char, clientAddress *C.char) C.int {
Username := C.GoString(username)
Password := C.GoString(password)
ClientId := C.GoString(clientId)
ClientAddress := C.GoString(clientAddress)
c := pb.NewGreeterClient(clientConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.BasicAuth(ctx, &pb.BasicAuthRequest{Username: Username, Password: Password, ClientId: ClientId, ClientAddress: ClientAddress})
if err != nil {
log.Fatalf("[PluginBasicAuth]could not greet: %v", err)
return mosq_err.MOSQ_ERR_UNKNOWN
}
log.Printf("[PluginBasicAuth] Greeting: %d", r.GetCode())
return C.int(r.GetCode())
}
//export PluginAclCheck
func PluginAclCheck(username *C.char, clientId *C.char, topic *C.char, access C.int, qos C.int, retain C.int) C.int {
Username := C.GoString(username)
ClientId := C.GoString(clientId)
Topic := C.GoString(topic)
Access := int32(access)
Retain := int32(retain)
c := pb.NewGreeterClient(clientConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.AclCheck(ctx, &pb.AclCheckRequest{Username: Username, ClientId: ClientId, Topic: Topic, Access: Access, Retain: Retain})
if err != nil {
log.Fatalf("[PluginAclCheck] could not greet: %v", err)
return mosq_err.MOSQ_ERR_UNKNOWN
}
log.Printf("[PluginAclCheck] Greeting: %d", r.GetCode())
return C.int(r.GetCode())
}
func main() {}
- 这里主要是将C语言传入的参数类型转为golang参数类型,然后直接调用GRPC方法即可
- 执行
go build -buildmode=c-shared -o grpc_auth.so
即可得到我们想要的动态库
3.使用c语言动态库方法调用grpc_auth.so
3.1 插件初始化时打开动态库
int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, struct mosquitto_opt *opts, int opt_count)
{
// 打开grpc插件
grpc_handle = dlopen(grpc_path, RTLD_NOW); // 打开指定的动态库
if (!grpc_handle)
{
mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] grpc_path = %s, dlerror = %s", grpc_path, dlerror());
return MOSQ_ERR_PLUGIN_DEFER;
}
// 初始化动态库
err_code = callback_init();
if (err_code != MOSQ_ERR_SUCCESS)
{
mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] callback_init err = %d", err_code);
return err_code;
}
return err_code;
}
- 使用
dlopen
打开对应路劲动态库,并保存handle
3.2 回调函数调用动态库里面的方法
调用动态库PluginBasicAuth
方法
static int callback_basic_auth(int event, void *event_data, void *userdata)
{
UNUSED(event);
UNUSED(userdata);
struct mosquitto_evt_basic_auth *ed = event_data;
char *client_id = mosquitto_client_id(ed->client);
char *client_address = mosquitto_client_address(ed->client);
mosquitto_log_printf(MOSQ_LOG_INFO, "[callback_basic_auth] username=%s, password=%s, client_id=%s, client_address=%s", ed->username, ed->password, client_id, client_address);
PluginBasicAuth pluginBasicAuth = (PluginBasicAuth)dlsym(grpc_handle, "PluginBasicAuth");
if (!pluginBasicAuth)
{
mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] PluginBasicAuth undefined");
return MOSQ_ERR_PLUGIN_DEFER;
}
return pluginBasicAuth(ed->username, ed->password, client_id, client_address);
}
调用动态库PluginAclCheck
方法
static int callback_acl_check(int event, void *event_data, void *userdata)
{
UNUSED(event);
UNUSED(userdata);
struct mosquitto_evt_acl_check *ed = event_data;
char *username = mosquitto_client_username(ed->client);
char *client_id = mosquitto_client_id(ed->client);
char *topic = ed->topic;
int access = ed->access;
int qos = ed->qos;
int retain = ed->retain ? 1 : 0;
mosquitto_log_printf(MOSQ_LOG_INFO, "[callback_acl_check] username=%s, client_id=%s, topic=%s, access=%d, qos=%d, retain=%d", username, client_id, topic, access, qos, retain);
PluginAclCheck pluginAclCheck = (PluginAclCheck)dlsym(grpc_handle, "PluginAclCheck");
if (!pluginAclCheck)
{
mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] PluginAclCheck undefined");
return MOSQ_ERR_PLUGIN_DEFER;
}
return pluginAclCheck(username, client_id, topic, access, qos, retain);
}
3.3 插件结束时关闭动态库
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
int err_code = MOSQ_ERR_SUCCESS;
dlclose(grpc_handle);
return MOSQ_ERR_SUCCESS;
}
4.配置加载动态库参数
// mosquitto.conf
plugin /mosquitto/dll/mosquitto_payload_modification.so
plugin_opt_grpc_path /mosquitto/dll/grpc_auth.so
plugin_opt_grpc_addr 127.0.0.1:10086
- 这里C语言的动态库插件然后去调用golang生成的动态库去使用GRPC