分析了SimpleMessage和更上一层的数据结构,再来看看底层的通信是怎么实现的。前面已分析过SmplMsgConnection这个基类,它是对SimpleMessage底层通信的一层抽象。而SimpleSocket类是SmplMsgConnection的派生类,它定义了当采用socket做通信时的一些socket方法接口,后面编写的TCP server,TCP client,UDP server,UDP client均是基于SimpleSocket来实现的。
namespace industrial
{
namespace simple_socket
{
namespace StandardSocketPorts
{
enum StandardSocketPort
{
MOTION = 11000, SYSTEM = 11001, STATE = 11002, IO = 11003
};
}
typedef StandardSocketPorts::StandardSocketPort StandardSocketPort;
class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
{
public:
SimpleSocket()
{
this->setSockHandle(this->SOCKET_FAIL);
memset(&this->sockaddr_, 0, sizeof(this->sockaddr_));
this->setConnected(false);
}
virtual ~SimpleSocket(){}
bool isConnected()
{
return connected_;
}
virtual void setDisconnected()
{
setConnected(false);
}
bool isReadyReceive(int timeout)
{
bool r, e;
rawPoll(timeout, r, e);
return r;
}
protected:
//接收和发送数据的socket句柄
int sock_handle_;
//远程socket地址和端口号
sockaddr_in sockaddr_;
//socket连接状态
bool connected_;
//socket错误返回值
static const int SOCKET_FAIL = -1;
//接收数据最大字节数
static const int MAX_BUFFER_SIZE = 1024;
//查询socket是否ready的超时参数,1000ms
static const int SOCKET_POLL_TO = 1000;
//用于接收数据的内部缓冲区
char buffer_[MAX_BUFFER_SIZE + 1];
//获取socket句柄
int getSockHandle() const
{
return sock_handle_;
}
//设置socket句柄
void setSockHandle(int sock_handle_)
{
this->sock_handle_ = sock_handle_;
}
//错误报告
__attribute__((deprecated(
"Please use: logSocketError(const char* msg, const int rc, const int error_no)")))
void logSocketError(const char* msg, int rc)
{
logSocketError(msg, rc, errno);
}
void logSocketError(const char* msg, const int rc, const int error_no)
{
LOG_ERROR("%s, rc: %d. Error: '%s' (errno: %d)", msg, rc, strerror(error_no), error_no);
}
// 发送和接收方法,需派生类重写 rawSendBytes 和 rawReceiveBytes
// Virtual
bool sendBytes(industrial::byte_array::ByteArray & buffer);
bool receiveBytes(industrial::byte_array::ByteArray & buffer,
industrial::shared_types::shared_int num_bytes);
// Virtual
virtual int rawSendBytes(char *buffer,
industrial::shared_types::shared_int num_bytes)=0;
virtual int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes)=0;
//查询socket上是否有数据
virtual bool rawPoll(int timeout, bool & ready, bool & error)=0;
virtual void setConnected(bool connected)
{
this->connected_ = connected;
}
};
} //simple_socket
} //industrial
以上代码定义了socket通信的接口,数据成员包括socket句柄(也就是文件描述符),地址和端口号,连接状态,内部数据缓冲区等。可以看出rawSendBytes、rawReceiveBytes,rawPoll并没有做实现,这3个方法是放到其派生类TcpSocket中实现了,此处只是定义了接口的功能。
sendBytes
bool SimpleSocket::sendBytes(ByteArray & buffer)
{
int rc = this->SOCKET_FAIL;
bool rtn = false;
if (this->isConnected())
{
// Nothing restricts the ByteArray from being larger than the what the socket
// can handle.
if (this->MAX_BUFFER_SIZE > (int)buffer.getBufferSize())
{
// copy to local array, since ByteArray no longer supports
// direct pointer-access to data values
std::vector<char> localBuffer;
buffer.copyTo(localBuffer);
rc = rawSendBytes(&localBuffer[0], localBuffer.size());
if (this->SOCKET_FAIL != rc)
{
rtn = true;
}
else
{
rtn = false;
logSocketError("Socket sendBytes failed", rc, errno);
}
}
else
{
LOG_ERROR("Buffer size: %u, is greater than max socket size: %u", buffer.getBufferSize(), this->MAX_BUFFER_SIZE);
rtn = false;
}
}
else
{
rtn = false;
LOG_WARN("Not connected, bytes not sent");
}
if (!rtn)
{
this->setConnected(false);
}
return rtn;
}
sendBytes实现的是SmplMsgConnection中的虚方法,功能是发送ByteArray类型的数据。首先确保socket能发送的字节数比待发送数据的数据字节数要大,然后拷贝数据至临时变量,再调用rawSendBytes
将数据发送出去,并返回发送结果至上层。
receiveBytes
bool SimpleSocket::receiveBytes(ByteArray & buffer, shared_int num_bytes)
{
int rc = this->SOCKET_FAIL;
bool rtn = false;
shared_int remainBytes = num_bytes;
bool ready, error;
memset(&this->buffer_, 0, sizeof(this->buffer_));
if (this->MAX_BUFFER_SIZE > buffer.getMaxBufferSize())
{
LOG_WARN("Socket buffer max size: %u, is larger than byte array buffer: %u",
this->MAX_BUFFER_SIZE, buffer.getMaxBufferSize());
}
if (this->isConnected())
{
buffer.init();
while (remainBytes > 0)
{
// Polling the socket results in an "interruptable" socket read. This
// allows Control-C to break out of a socket read. Without polling,
// a sig-term is required to kill a program in a socket read function.
if (this->rawPoll(this->SOCKET_POLL_TO, ready, error))
{
if(ready)
{
rc = rawReceiveBytes(this->buffer_, remainBytes);
if (this->SOCKET_FAIL == rc)
{
this->logSocketError("Socket received failed", rc, errno);
remainBytes = 0;
rtn = false;
break;
}
else if (0 == rc)
{
LOG_WARN("Recieved zero bytes: %u", rc);
remainBytes = 0;
rtn = false;
break;
}
else
{
remainBytes = remainBytes - rc;
LOG_COMM("Byte array receive, bytes read: %u, bytes reqd: %u, bytes left: %u",
rc, num_bytes, remainBytes);
buffer.load(&this->buffer_, rc);
rtn = true;
}
}
else if(error)
{
LOG_ERROR("Socket poll returned an error");
rtn = false;
break;
}
else
{
LOG_ERROR("Uknown error from socket poll");
rtn = false;
break;
}
}
else
{
LOG_COMM("Socket poll timeout, trying again");
}
}
}
else
{
LOG_WARN("Not connected, bytes not sent");
rtn = false;
}
if (!rtn)
{
this->setConnected(false);
}
return rtn;
}
receiveBytes实现的是SmplMsgConnection中的虚方法,功能是从socket上接收数据至ByteArray类型的变量。首先会检查socket能接收的最大字节数是否大于ByteArray配置的最大字节数,如果大于,则给出警告,因为可能会丢失数据。接着会调用rawPoll
查询socket上的数据是否可读,最长阻塞时间为1000ms。如果可读,则调用rawReceiveBytes
接收数据,接收到有效数据后调用buffer.load
将buffer_内的数据加载到传进来的buffer中,并返回true给上层。
下一篇将介绍TcpSocket这个类,看一下上面的虚方法rawSendBytes,rawReceiveBytes,rawPoll是如何实现的。