我来帮您实现一个Android IM的Java Socket连接,包含心跳、消息收发和Protocol Buffers支持。
1. 首先添加依赖
<gradle>
// app/build.gradledependencies { implementation 'com.google.protobuf:protobuf-java:3.21.12' implementation 'com.google.protobuf:protobuf-java-util:3.21.12'}
2. 定义Protocol Buffer消息格式
<protobuf>
// message.protosyntax = "proto3";package im;option java_package = "com.example.im.proto";option java_outer_classname = "IMProto";// 消息类型enum MessageType { UNKNOWN = 0; HEARTBEAT = 1; TEXT_MESSAGE = 2; LOGIN = 3; LOGIN_RESPONSE = 4; ACK = 5;}// 基础消息message BaseMessage { MessageType type = 1; int64 timestamp = 2; string messageId = 3; bytes data = 4;}// 心跳消息message HeartbeatMessage { int64 clientTime = 1;}// 文本消息message TextMessage { string fromUserId = 1; string toUserId = 2; string content = 3; int64 sendTime = 4;}// 登录消息message LoginMessage { string userId = 1; string token = 2;}// 登录响应message LoginResponse { bool success = 1; string message = 2;}
3. Socket连接管理器
<java>
import android.os.Handler;import android.os.HandlerThread;import android.os.Looper;import android.util.Log;import com.example.im.proto.IMProto;import com.google.protobuf.ByteString;import com.google.protobuf.InvalidProtocolBufferException;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.net.Socket;import java.util.UUID;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.atomic.AtomicBoolean;public class IMSocketManager { private static final String TAG = "IMSocketManager"; private static final int HEARTBEAT_INTERVAL = 30000; // 30秒心跳间隔 private static final int CONNECT_TIMEOUT = 10000; // 连接超时时间 private Socket socket; private DataInputStream inputStream; private DataOutputStream outputStream; private HandlerThread sendThread; private Handler sendHandler; private HandlerThread receiveThread; private Handler receiveHandler; private Handler mainHandler; private AtomicBoolean isConnected = new AtomicBoolean(false); private AtomicBoolean isConnecting = new AtomicBoolean(false); private ConcurrentLinkedQueue<IMProto.BaseMessage> sendQueue = new ConcurrentLinkedQueue<>(); private String serverHost; private int serverPort; private IMConnectionListener connectionListener; private IMMessageListener messageListener; // 心跳相关 private Runnable heartbeatRunnable = new Runnable() { @Override public void run() { sendHeartbeat(); if (isConnected.get()) { sendHandler.postDelayed(this, HEARTBEAT_INTERVAL); } } }; public interface IMConnectionListener { void onConnected(); void onDisconnected(); void onConnectFailed(String error); } public interface IMMessageListener { void onMessageReceived(IMProto.BaseMessage message); void onMessageSent(String messageId); void onMessageFailed(String messageId, String error); } public IMSocketManager(String host, int port) { this.serverHost = host; this.serverPort = port; this.mainHandler = new Handler(Looper.getMainLooper()); initThreads(); } private void initThreads() { sendThread = new HandlerThread("IM-Send-Thread"); sendThread.start(); sendHandler = new Handler(sendThread.getLooper()); receiveThread = new HandlerThread("IM-Receive-Thread"); receiveThread.start(); receiveHandler = new Handler(receiveThread.getLooper()); } public void setConnectionListener(IMConnectionListener listener) { this.connectionListener = listener; } public void setMessageListener(IMMessageListener listener) { this.messageListener = listener; } // 连接服务器 public void connect() { if (isConnecting.get() || isConnected.get()) { Log.w(TAG, "Already connected or connecting"); return; } isConnecting.set(true); sendHandler.post(() -> { try { socket = new Socket(serverHost, serverPort); socket.setSoTimeout(0); // 无超时 socket.setKeepAlive(true); socket.setTcpNoDelay(true); inputStream = new DataInputStream(socket.getInputStream()); outputStream = new DataOutputStream(socket.getOutputStream()); isConnected.set(true); isConnecting.set(false); // 开始接收消息 startReceiving(); // 开始心跳 sendHandler.postDelayed(heartbeatRunnable, HEARTBEAT_INTERVAL); // 通知连接成功 mainHandler.post(() -> { if (connectionListener != null) { connectionListener.onConnected(); } }); // 处理发送队列中的消息 processSendQueue(); } catch (IOException e) { Log.e(TAG, "Connect failed", e); isConnecting.set(false); mainHandler.post(() -> { if (connectionListener != null) { connectionListener.onConnectFailed(e.getMessage()); } }); } }); } // 断开连接 public void disconnect() { isConnected.set(false); sendHandler.removeCallbacks(heartbeatRunnable); sendHandler.post(() -> { try { if (socket != null && !socket.isClosed()) { socket.close(); } } catch (IOException e) { Log.e(TAG, "Disconnect error", e); } mainHandler.post(() -> { if (connectionListener != null) { connectionListener.onDisconnected(); } }); }); } // 发送消息 public void sendMessage(IMProto.BaseMessage message) { if (!isConnected.get()) { sendQueue.offer(message); connect(); // 尝试重连 return; } sendHandler.post(() -> { try { writeMessage(message); mainHandler.post(() -> { if (messageListener != null) { messageListener.onMessageSent(message.getMessageId()); } }); } catch (IOException e) { Log.e(TAG, "Send message failed", e); mainHandler.post(() -> { if (messageListener != null) { messageListener.onMessageFailed(message.getMessageId(), e.getMessage()); } }); // 发送失败,尝试重连 handleConnectionError(); } }); } // 发送文本消息 public void sendTextMessage(String toUserId, String content) { IMProto.TextMessage textMessage = IMProto.TextMessage.newBuilder() .setFromUserId(getCurrentUserId()) .setToUserId(toUserId) .setContent(content) .setSendTime(System.currentTimeMillis()) .build(); IMProto.BaseMessage baseMessage = IMProto.BaseMessage.newBuilder() .setType(IMProto.MessageType.TEXT_MESSAGE) .setTimestamp(System.currentTimeMillis()) .setMessageId(UUID.randomUUID().toString()) .setData(textMessage.toByteString()) .build(); sendMessage(baseMessage); } // 发送登录消息 public void sendLoginMessage(String userId, String token) { IMProto.LoginMessage loginMessage = IMProto.LoginMessage.newBuilder() .setUserId(userId) .setToken(token) .build(); IMProto.BaseMessage baseMessage = IMProto.BaseMessage.newBuilder() .setType(IMProto.MessageType.LOGIN) .setTimestamp(System.currentTimeMillis()) .setMessageId(UUID.randomUUID().toString()) .setData(loginMessage.toByteString()) .build(); sendMessage(baseMessage); } // 发送心跳 private void sendHeartbeat() { IMProto.HeartbeatMessage heartbeat = IMProto.HeartbeatMessage.newBuilder() .setClientTime(System.currentTimeMillis()) .build(); IMProto.BaseMessage baseMessage = IMProto.BaseMessage.newBuilder() .setType(IMProto.MessageType.HEARTBEAT) .setTimestamp(System.currentTimeMillis()) .setMessageId(UUID.randomUUID().toString()) .setData(heartbeat.toByteString()) .build(); sendMessage(baseMessage); } // 写入消息到流 private void writeMessage(IMProto.BaseMessage message) throws IOException { byte[] data = message.toByteArray(); outputStream.writeInt(data.length); outputStream.write(data); outputStream.flush(); } // 开始接收消息 private void startReceiving() { receiveHandler.post(() -> { while (isConnected.get()) { try { int length = inputStream.readInt(); if (length > 0 && length < 1024 * 1024) { // 限制消息大小为1MB byte[] data = new byte[length]; inputStream.readFully(data); IMProto.BaseMessage message = IMProto.BaseMessage.parseFrom(data); handleReceivedMessage(message); } } catch (IOException | InvalidProtocolBufferException e) { Log.e(TAG, "Receive message error", e); if (isConnected.get()) { handleConnectionError(); } break; } } }); } // 处理接收到的消息 private void handleReceivedMessage(IMProto.BaseMessage message) { switch (message.getType()) { case HEARTBEAT: // 心跳响应,不需要特殊处理 Log.d(TAG, "Received heartbeat response"); break; case TEXT_MESSAGE: handleTextMessage(message); break; case LOGIN_RESPONSE: handleLoginResponse(message); break; default: mainHandler.post(() -> { if (messageListener != null) { messageListener.onMessageReceived(message); } }); break; } } // 处理文本消息 private void handleTextMessage(IMProto.BaseMessage baseMessage) { try { IMProto.TextMessage textMessage = IMProto.TextMessage.parseFrom(baseMessage.getData()); Log.d(TAG, "Received text message: " + textMessage.getContent()); mainHandler.post(() -> { if (messageListener != null) { messageListener.onMessageReceived(baseMessage); } }); // 发送ACK确认 sendAck(baseMessage.getMessageId()); } catch (InvalidProtocolBufferException e) { Log.e(TAG, "Parse text message error", e); } } // 处理登录响应 private void handleLoginResponse(IMProto.BaseMessage baseMessage) { try { IMProto.LoginResponse response = IMProto.LoginResponse.parseFrom(baseMessage.getData()); Log.d(TAG, "Login response: " + response.getSuccess() + ", " + response.getMessage()); mainHandler.post(() -> { if (messageListener != null) { messageListener.onMessageReceived(baseMessage); } }); } catch (InvalidProtocolBufferException e) { Log.e(TAG, "Parse login response error", e); } } // 发送ACK确认 private void sendAck(String messageId) { IMProto.BaseMessage ackMessage = IMProto.BaseMessage.newBuilder() .setType(IMProto.MessageType.ACK) .setTimestamp(System.currentTimeMillis()) .setMessageId(UUID.randomUUID().toString()) .setData(ByteString.copyFromUtf8(messageId)) .build(); sendMessage(ackMessage); } // 处理发送队列 private void processSendQueue() { while (!sendQueue.isEmpty()) { IMProto.BaseMessage message = sendQueue.poll(); if (message != null) { sendMessage(message); } } } // 处理连接错误 private void handleConnectionError() { disconnect(); // 可以在这里实现重连逻辑 } // 获取当前用户ID(需要根据实际情况实现) private String getCurrentUserId() { return "current_user_id"; } // 清理资源 public void destroy() { disconnect(); sendThread.quit(); receiveThread.quit(); }}
4. 使用示例
<java>
public class MainActivity extends AppCompatActivity { private IMSocketManager imManager; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // 初始化IM管理器 imManager = new IMSocketManager("your.server.com", 8080); // 设置连接监听器 imManager.setConnectionListener(new IMSocketManager.IMConnectionListener() { @Override public void onConnected() { Log.d("MainActivity", "Connected to server"); // 连接成功后发送登录消息 imManager.sendLoginMessage("user123", "auth_token"); } @Override public void onDisconnected() { Log.d("MainActivity", "Disconnected from server"); } @Override public void onConnectFailed(String error) { Log.e("MainActivity", "Connect failed: " + error); } }); // 设置消息监听器 imManager.setMessageListener(new IMSocketManager.IMMessageListener() { @Override public void onMessageReceived(IMProto.BaseMessage message) { if (message.getType() == IMProto.MessageType.TEXT_MESSAGE) { try { IMProto.TextMessage textMessage = IMProto.TextMessage.parseFrom(message.getData()); // 处理接收到的文本消息 updateUI(textMessage); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } } @Override public void onMessageSent(String messageId) { Log.d("MainActivity", "Message sent: " + messageId); } @Override public void onMessageFailed(String messageId, String error) { Log.e("MainActivity", "Message failed: " + messageId + ", error: " + error); } }); // 连接服务器 imManager.connect(); // 发送消息示例 findViewById(R.id.sendButton).setOnClickListener(v -> { EditText editText = findViewById(R.id.messageEditText); String content = editText.getText().toString(); if (!TextUtils.isEmpty(content)) { imManager.sendTextMessage("target_user_id", content); editText.setText(""); } }); } private void updateUI(IMProto.TextMessage textMessage) { runOnUiThread(() -> { // 更新UI显示消息 TextView messageView = findViewById(R.id.messageTextView); messageView.append(textMessage.getFromUserId() + ": " + textMessage.getContent() + "\n"); }); } @Override protected void onDestroy() { super.onDestroy(); if (imManager != null) { imManager.destroy(); } }}
5. 添加网络权限
<xml>
<!-- AndroidManifest.xml --><uses-permission android:name="android.permission.INTERNET" /><uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
6. 消息存储(可选)
<java>
// 使用Room数据库存储消息@Entity(tableName = "messages")public class MessageEntity { @PrimaryKey @NonNull public String messageId; public String fromUserId; public String toUserId; public String content; public long timestamp; public int status; // 0: sending, 1: sent, 2: failed}@Daopublic interface MessageDao { @Insert void insert(MessageEntity message); @Query("SELECT * FROM messages ORDER BY timestamp DESC") LiveData<List<MessageEntity>> getAllMessages(); @Update void update(MessageEntity message);}
这个实现包含了:
Protocol Buffers消息定义和序列化Socket连接管理和心跳机制消息发送和接收队列断线重连机制线程安全的消息处理完整的回调接口
您可以根据具体需求进行调整和扩展。