Commit b833c822 authored by Administrator's avatar Administrator

init commit

parents
Pipeline #1739 canceled with stages
log.txt
*.swp
*.settings
*.classpath
*.project
*.iml
*.ipr
*.iws
*.DS_Store
*.orig
target/
.idea/
jmeter.log
lib/
LittleProxy.pro
/bin
*.jks
performance/site/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>echo-proxy</artifactId>
<groupId>com.virjar.proxy</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>echo-proxy-lib</artifactId>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.android</groupId>
<artifactId>android</artifactId>
<version>4.1.1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.24</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.virjar.echo.nat.client;
import com.virjar.echo.nat.log.EchoLogger;
import com.virjar.echo.nat.protocol.EchoPacket;
import com.virjar.echo.nat.protocol.PacketCommon;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
public class ClientIdleCheckHandler extends IdleStateHandler {
ClientIdleCheckHandler() {
super(PacketCommon.READ_IDLE_TIME, PacketCommon.WRITE_IDLE_TIME, 0);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
if (IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT == evt) {
EchoLogger.getLogger().info("write idle, write a heartbeat message to server");
EchoPacket proxyMessage = new EchoPacket();
proxyMessage.setType(PacketCommon.TYPE_HEARTBEAT);
ctx.channel().writeAndFlush(proxyMessage);
} else if (IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT == evt) {
//不能在readTimeout的时候,就判定超时。比如在下载大文件的时候,只有数据写。没有写idle发生。也就没有heartbeat的ack。不会产生heartbeat的响应包
EchoLogger.getLogger().info("first read idle, write a heartbeat message to server");
EchoPacket proxyMessage = new EchoPacket();
proxyMessage.setType(PacketCommon.TYPE_HEARTBEAT);
ctx.channel().writeAndFlush(proxyMessage);
} else if (IdleStateEvent.READER_IDLE_STATE_EVENT == evt) {
EchoLogger.getLogger().info("read timeout,close channel");
ctx.channel().close();
EchoLogger.getLogger().info("the cmd channel lost,restart client");
}
super.channelIdle(ctx, evt);
}
}
package com.virjar.echo.nat.client;
import com.virjar.echo.nat.log.EchoLogger;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientToRealServerChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
private EchoClient echoClient;
ClientToRealServerChannelHandler(EchoClient echoClient) {
this.echoClient = echoClient;
}
private Long checkConnection(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
Long serial = channel.attr(EchoClientConstant.SERIAL_NUM).get();
if (serial == null) {
EchoLogger.getLogger().warn("not serial bound for channel:" + channel);
channel.close();
return null;
}
if (!echoClient.isAlive()) {
EchoLogger.getLogger().warn("nat connection closed,close proxyToServer connection");
channel.close();
return null;
}
return serial;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
Long seq = ctx.channel().attr(EchoClientConstant.SERIAL_NUM).get();
if (seq == null) {
EchoLogger.getLogger().warn("no serial bound for clientToRealServer channel!!");
return;
}
// msg.retain();
echoClient.forwardRealServerResponse(msg, seq);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Long serial = checkConnection(ctx);
if (serial == null) {
return;
}
echoClient.realServerDisconnect(serial);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
EchoLogger.getLogger().error("error occur ", cause);
super.exceptionCaught(ctx, cause);
}
}
package com.virjar.echo.nat.client;
import com.virjar.echo.nat.log.EchoLogger;
import com.virjar.echo.nat.protocol.EchoPacket;
import com.virjar.echo.nat.protocol.EchoPacketDecoder;
import com.virjar.echo.nat.protocol.EchoPacketEncoder;
import com.virjar.echo.nat.protocol.PacketCommon;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class EchoClient {
private String natServerHost;
private int natServerPort;
private String clientId;
public EchoClient(String natServerHost, int natServerPort, String clientId) {
this.natServerHost = natServerHost;
this.natServerPort = natServerPort;
this.clientId = clientId;
}
private Bootstrap echoClientBootstrap;
private Bootstrap echoClientToRealServerBootstrap;
private Map<Long, Channel> allRealServerChannels = new ConcurrentHashMap<>();
ChannelFuture connectToRealServer(final Long seq, final String host, final Integer port) {
ChannelFuture connect = echoClientToRealServerBootstrap.connect(host, port);
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
return;
}
allRealServerChannels.put(seq, future.channel());
future.channel().attr(EchoClientConstant.SERIAL_NUM).set(seq);
EchoLogger.getLogger().info("create tcp connection success for host->"
+ host + ":" + port + " for request: " + seq);
// now notify nat server the connection create ready!!
EchoPacket echoPacket = new EchoPacket();
echoPacket.setType(PacketCommon.TYPE_CONNECT_READY);
echoPacket.setSerialNumber(seq);
echoPacket.setExtra(getClientId());
EchoClient.this.natChannel.writeAndFlush(echoPacket);
}
});
return connect;
}
void closeRealServerConnection(Long seq) {
final Channel realServerChannel = allRealServerChannels.remove(seq);
if (realServerChannel == null) {
EchoLogger.getLogger().warn("can not find real server connection for request: " + seq);
return;
}
realServerChannel
.writeAndFlush(Unpooled.EMPTY_BUFFER)// 目的是保证所有的数据,都正确写入到server。否则可能还么没有写入到server,就关闭了请求
.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) {
realServerChannel.close();
}
});
}
void forwardNatServerRequest(ChannelHandlerContext ctx, EchoPacket echoPacket) {
Channel channel = allRealServerChannels.get(echoPacket.getSerialNumber());
if (channel == null) {
EchoLogger.getLogger().warn("can not find real server connection for request: " + echoPacket.getSerialNumber());
return;
}
byte[] data = echoPacket.getData();
ByteBuf buf = ctx.alloc().buffer(data.length);
buf.writeBytes(data);
channel.writeAndFlush(buf);
}
void forwardRealServerResponse(ByteBuf msg, Long seq) {
int maxPackSize = PacketCommon.MAX_FRAME_LENGTH - 128;
while (msg.readableBytes() > maxPackSize) {
byte[] bytes = new byte[maxPackSize];
msg.readBytes(bytes);
EchoPacket natMessage = new EchoPacket();
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(PacketCommon.TYPE_TRANSFER);
//write,但是不需要 flush
natChannel.write(natMessage);
EchoLogger.getLogger().info("receive data from real server endpoint with big packet, forward to natChannel");
}
if (msg.readableBytes() > 0) {
EchoPacket natMessage = new EchoPacket();
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(PacketCommon.TYPE_TRANSFER);
natChannel.writeAndFlush(natMessage);
} else {
natChannel.flush();
}
}
void realServerDisconnect(Long seq) {
EchoPacket echoPacket = new EchoPacket();
echoPacket.setType(PacketCommon.TYPE_DISCONNECT);
echoPacket.setSerialNumber(seq);
natChannel.writeAndFlush(echoPacket);
allRealServerChannels.remove(seq);
}
public void startUp() {
NioEventLoopGroup workerGroup = new NioEventLoopGroup(
0,
new DefaultThreadFactory("nat-endpoint-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class))
//, NioUdtProvider.BYTE_PROVIDER
);
echoClientBootstrap = new Bootstrap();
echoClientBootstrap.group(workerGroup);
echoClientBootstrap.channel(NioSocketChannel.class);
echoClientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(EchoClientConstant.ECHO_DECODER, new EchoPacketDecoder());
ch.pipeline().addLast(EchoClientConstant.ECHO_ENCODER, new EchoPacketEncoder());
ch.pipeline().addLast(EchoClientConstant.ECHO_IDLE, new ClientIdleCheckHandler());
ch.pipeline().addLast(EchoClientConstant.ECHO_NAT, new EchoClientChannelHandler(EchoClient.this));
}
});
workerGroup = new NioEventLoopGroup(
1,
new DefaultThreadFactory("proxy-endpoint-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class))
//, NioUdtProvider.BYTE_PROVIDER
);
echoClientToRealServerBootstrap = new Bootstrap();
echoClientToRealServerBootstrap.group(workerGroup);
echoClientToRealServerBootstrap.channel(NioSocketChannel.class);
echoClientToRealServerBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientToRealServerChannelHandler(EchoClient.this));
}
});
connectNatServer();
}
private volatile boolean isConnecting = false;
private Channel natChannel;
boolean isAlive() {
return natChannel != null && natChannel.isActive();
}
public void connectNatServer() {
Channel cmdChannelCopy = natChannel;
if (cmdChannelCopy != null && cmdChannelCopy.isActive()) {
EchoLogger.getLogger().info("cmd channel active, and close channel,heartbeat timeout ?");
return;
}
if (isConnecting) {
EchoLogger.getLogger().warn("connect event fire already");
return;
}
isConnecting = true;
EchoLogger.getLogger().info("connect to nat server...");
echoClientBootstrap
.connect(natServerHost, natServerPort)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
isConnecting = false;
if (!channelFuture.isSuccess()) {
EchoLogger.getLogger().warn("connect to nat server failed", channelFuture.cause());
echoClientBootstrap.group().schedule(new Runnable() {
@Override
public void run() {
EchoLogger.getLogger().info("connect to nat server failed, reconnect by scheduler task start");
connectNatServer();
}
}, reconnectWait(), TimeUnit.MILLISECONDS);
} else {
sleepTimeMill = 1000;
natChannel = channelFuture.channel();
EchoLogger.getLogger().info("connect to nat server success:" + natChannel);
EchoPacket proxyMessage = new EchoPacket();
proxyMessage.setType(PacketCommon.C_TYPE_REGISTER);
proxyMessage.setExtra(clientId);
natChannel.writeAndFlush(proxyMessage);
}
}
});
}
private static long sleepTimeMill = 1000;
private static long reconnectWait() {
if (sleepTimeMill > 120000) {
sleepTimeMill = 120000;
}
synchronized (EchoClient.class) {
sleepTimeMill = sleepTimeMill + 1000;
return sleepTimeMill;
}
}
public String getClientId() {
return clientId;
}
}
package com.virjar.echo.nat.client;
import com.virjar.echo.nat.log.EchoLogger;
import com.virjar.echo.nat.protocol.EchoPacket;
import com.virjar.echo.nat.protocol.PacketCommon;
import io.netty.channel.*;
import java.util.concurrent.TimeUnit;
public class EchoClientChannelHandler extends SimpleChannelInboundHandler<EchoPacket> {
private EchoClient echoClient;
EchoClientChannelHandler(EchoClient echoClient) {
this.echoClient = echoClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, EchoPacket msg) {
EchoLogger.getLogger().info("received message from server: " + PacketCommon.getReadableType(msg.getType()));
switch (msg.getType()) {
case PacketCommon.TYPE_HEARTBEAT:
handleHeartbeatMessage();
break;
case PacketCommon.TYPE_CONNECT:
handleConnect(ctx, msg);
break;
case PacketCommon.TYPE_TRANSFER:
handleTransfer(ctx, msg);
break;
case PacketCommon.TYPE_DISCONNECT:
handleDisconnectMessage(msg);
break;
default:
EchoLogger.getLogger().warn("Unknown message type:" + msg.getType());
break;
}
}
private void handleDisconnectMessage(EchoPacket msg) {
// 关闭真实服务器的链接,比如baidu.com:443
echoClient.closeRealServerConnection(msg.getSerialNumber());
}
private void handleConnect(final ChannelHandlerContext ctx, final EchoPacket msg) {
String hostAndPort = msg.getExtra();
if (hostAndPort == null) {
EchoLogger.getLogger().error("empty Host:Port config for connect command from nat server");
disconnect(ctx, msg, "empty Host:Port config");
return;
}
hostAndPort = hostAndPort.trim();
if (hostAndPort.isEmpty()) {
EchoLogger.getLogger().error("empty Host:Port config for connect command from nat server");
disconnect(ctx, msg, "empty Host:Port config ");
return;
}
String[] split = hostAndPort.split(":");
if (split.length != 2) {
EchoLogger.getLogger().error("error Host:Port config for connect command from nat server: " + hostAndPort);
disconnect(ctx, msg, "error Host:Port config");
return;
}
String host = split[0];
int port;
try {
port = Integer.parseInt(split[1]);
} catch (NumberFormatException e) {
EchoLogger.getLogger().error("error Host:Port config for connect command from nat server: " + hostAndPort
, e);
disconnect(ctx, msg, "error Host:Port config");
return;
}
ChannelFuture proxyToServerConnection;
EchoLogger.getLogger().info("create a new connection for-> "
+ host + ":" + port + " for request id: " + msg.getSerialNumber());
// create a connect
try {
proxyToServerConnection = echoClient.connectToRealServer(msg.getSerialNumber(), host, port);
} catch (Exception e) {
//这里是IP:port端口配置不合法
EchoLogger.getLogger().error("error Host:Port config", e);
disconnect(ctx, msg, "error Host:Port config:" + hostAndPort + " error:" + e.getMessage());
return;
}
proxyToServerConnection.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture proxyToServerFuture) {
if (!proxyToServerFuture.isSuccess()) {
EchoLogger.getLogger().warn("connect to real server failed", proxyToServerFuture.cause());
disconnect(ctx, msg, proxyToServerFuture.cause().getMessage());
return;
}
if (!echoClient.isAlive()) {
EchoLogger.getLogger().warn("nat channel closed after create server connection completed");
disconnect(ctx, msg, "nat channel closed after create server connection completed");
proxyToServerFuture.channel().close();
}
}
});
}
private void handleTransfer(ChannelHandlerContext ctx, EchoPacket msg) {
echoClient.forwardNatServerRequest(ctx, msg);
}
private void disconnect(ChannelHandlerContext ctx, EchoPacket msg, String errorMsg) {
EchoPacket disconnectCmd = new EchoPacket();
disconnectCmd.setSerialNumber(msg.getSerialNumber());
disconnectCmd.setType(PacketCommon.TYPE_DISCONNECT);
if (errorMsg != null) {
disconnectCmd.setData(errorMsg.getBytes());
}
ctx.writeAndFlush(msg);
}
private void handleHeartbeatMessage() {
EchoLogger.getLogger().info("receive heartbeat message from nat server");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
EchoLogger.getLogger().warn("client disconnected: " + ctx.channel() + " prepare to reconnect");
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
echoClient.connectNatServer();
}
}, 1500, TimeUnit.MILLISECONDS);
}
}
package com.virjar.echo.nat.client;
import io.netty.util.AttributeKey;
public class EchoClientConstant {
public static AttributeKey<Long> SERIAL_NUM = AttributeKey.newInstance("connection_serial");
public static final String ECHO_ENCODER = "echo-encoder";
public static final String ECHO_DECODER = "echo-decoder";
public static final String ECHO_IDLE = "echo-idle";
public static final String ECHO_NAT = "echo-nat";
}
package com.virjar.echo.nat.log;
import android.util.Log;
public class AndroidLogger implements ILogger {
@Override
public void info(String msg) {
Log.i(EchoLogger.tag, msg);
}
@Override
public void info(String msg, Throwable throwable) {
Log.i(EchoLogger.tag, msg, throwable);
}
@Override
public void warn(String msg) {
Log.w(EchoLogger.tag, msg);
}
@Override
public void warn(String msg, Throwable throwable) {
Log.w(EchoLogger.tag, msg, throwable);
}
@Override
public void error(String msg) {
Log.e(EchoLogger.tag, msg);
}
@Override
public void error(String msg, Throwable throwable) {
Log.e(EchoLogger.tag, msg, throwable);
}
}
package com.virjar.echo.nat.log;
import android.util.Log;
import org.slf4j.LoggerFactory;
public class EchoLogger {
@SuppressWarnings("all")
public static String tag = "Echo";
private static ILogger logger = null;
static {
genLogger();
}
public static void setLogger(ILogger logger) {
if (logger == null) {
throw new IllegalArgumentException("input logger can not be null");
}
EchoLogger.logger = logger;
}
private static void genLogger() {
try {
Log.i(tag, "test sekiro log");
logger = new AndroidLogger();
return;
} catch (Throwable throwable) {
//ignore
}
try {
logger = (ILogger) LoggerFactory.getLogger(tag);
return;
} catch (Throwable throwable) {
//ignore
}
try {
LoggerFactory.getLogger(tag).info("test sekiro log");
logger = new Slf4jLogger();
return;
} catch (Throwable throwable) {
//ignore
}
logger = new SystemOutLogger();
}
public static ILogger getLogger() {
return logger;
}
}
package com.virjar.echo.nat.log;
public interface ILogger {
void info(String msg);
void info(String msg, Throwable throwable);
void warn(String msg);
void warn(String msg, Throwable throwable);
void error(String msg);
void error(String msg, Throwable throwable);
}
package com.virjar.echo.nat.log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Slf4jLogger implements ILogger {
private static final Logger logger = LoggerFactory.getLogger(EchoLogger.tag);
@Override
public void info(String msg) {
logger.info(msg);
}
@Override
public void info(String msg, Throwable throwable) {
logger.info(msg, throwable);
}
@Override
public void warn(String msg) {
logger.warn(msg);
}
@Override
public void warn(String msg, Throwable throwable) {
logger.warn(msg, throwable);
}
@Override
public void error(String msg) {
logger.error(msg);
}
@Override
public void error(String msg, Throwable throwable) {
logger.error(msg, throwable);
}
}
package com.virjar.echo.nat.log;
public class SystemOutLogger implements ILogger {
@Override
public void info(String msg) {
System.out.println(msg);
}
@Override
public void info(String msg, Throwable throwable) {
System.out.println(msg);
throwable.printStackTrace(System.out);
}
@Override
public void warn(String msg) {
System.out.println(msg);
}
@Override
public void warn(String msg, Throwable throwable) {
System.out.println(msg);
throwable.printStackTrace(System.out);
}
@Override
public void error(String msg) {
System.err.println(msg);
}
@Override
public void error(String msg, Throwable throwable) {
System.err.println(msg);
throwable.printStackTrace(System.err);
}
}
package com.virjar.echo.nat.protocol;
import lombok.Data;
@Data
public class EchoPacket {
/**
* 消息类型
*/
private byte type;
/**
* 消息流水号
*/
private long serialNumber;
/**
* 额外参数,其含义在不同的消息类型下,有不同的作用
*/
private String extra;
/**
* 消息传输数据
*/
private byte[] data;
}
package com.virjar.echo.nat.protocol;
import com.virjar.echo.nat.log.EchoLogger;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class EchoPacketDecoder extends ByteToMessageDecoder {
private static final byte HEADER_SIZE = 12;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
EchoLogger.getLogger().info("decode data for: " + ctx.channel());
while (true) {
if (in.readableBytes() < HEADER_SIZE) {
return;
}
int originIndex = in.readerIndex();
long magic = in.readLong();
if (magic != PacketCommon.magic) {
ctx.close();
throw new IllegalStateException("package not start with legal magic: " + magic + " expected:" + PacketCommon.magic);
}
int frameLength = in.readInt();
if (in.readableBytes() < frameLength) {
EchoLogger.getLogger().info("not enough data received: readableBytes->" + in.readableBytes()
+ " frameLength->" + frameLength
);
in.readerIndex(originIndex);
return;
}
try {
EchoPacket echoPacket = new EchoPacket();
byte type = in.readByte();
long sn = in.readLong();
echoPacket.setSerialNumber(sn);
echoPacket.setType(type);
byte extraByteLength = in.readByte();
byte[] extraBytes = new byte[extraByteLength];
in.readBytes(extraBytes);
echoPacket.setExtra(new String(extraBytes));
int dataLength = frameLength - PacketCommon.TYPE_SIZE - PacketCommon.SERIAL_NUMBER_SIZE - PacketCommon.EXTRA_LENGTH_SIZE - extraByteLength;
if (dataLength < 0) {
EchoLogger.getLogger().error("message protocol error,negative data length:" + dataLength + " for channel: " + ctx.channel()
+ " frameLength: " + frameLength + " type:" + type + " serial_number:" + sn + " uriLength:" + extraByteLength + " extra:" + echoPacket.getExtra()
);
ctx.channel().close();
return;
}
if (dataLength > 0) {
byte[] data = new byte[dataLength];
in.readBytes(data);
echoPacket.setData(data);
}
out.add(echoPacket);
} catch (Exception e) {
EchoLogger.getLogger().error("message decode failed for channel: " + ctx.channel(), e);
ctx.channel().close();
}
}
}
}
package com.virjar.echo.nat.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class EchoPacketEncoder extends MessageToByteEncoder<EchoPacket> {
@Override
protected void encode(ChannelHandlerContext ctx, EchoPacket msg, ByteBuf out) {
int bodyLength = PacketCommon.TYPE_SIZE + PacketCommon.SERIAL_NUMBER_SIZE + PacketCommon.EXTRA_LENGTH_SIZE;
byte[] extraBytes = null;
if (msg.getExtra() != null) {
extraBytes = msg.getExtra().getBytes();
if (extraBytes.length > 127) {
throw new IllegalStateException("too length extra data: " + msg.getExtra());
}
bodyLength += extraBytes.length;
}
if (msg.getData() != null) {
bodyLength += msg.getData().length;
}
//避免无效的攻击
out.writeLong(PacketCommon.magic);
out.writeInt(bodyLength);
out.writeByte(msg.getType());
out.writeLong(msg.getSerialNumber());
if (extraBytes != null) {
out.writeByte((byte) extraBytes.length);
out.writeBytes(extraBytes);
} else {
out.writeByte((byte) 0x00);
}
if (msg.getData() != null) {
out.writeBytes(msg.getData());
}
}
}
package com.virjar.echo.nat.protocol;
public class PacketCommon {
/// 编码相关
static final int TYPE_SIZE = 1;
static final int SERIAL_NUMBER_SIZE = 8;
static final int EXTRA_LENGTH_SIZE = 1;
static final String magicString = "echo.001";
// magic is the hexData of 'magicString'
static final long magic = 0x6563686F2E303031L;
//单个报文,最大1M,超过1M需要分段
public static final int MAX_FRAME_LENGTH = 1024 * 1024;
//心跳相关
public static int READ_IDLE_TIME = 30;
public static int WRITE_IDLE_TIME = 5;
/// 消息类型相关
public static String getReadableType(byte type) {
switch (type) {
case C_TYPE_REGISTER:
return "C_TYPE_REGISTER";
case TYPE_HEARTBEAT:
return "TYPE_HEARTBEAT";
case TYPE_CONNECT:
return "TYPE_CONNECT";
case TYPE_DISCONNECT:
return "TYPE_DISCONNECT";
case TYPE_TRANSFER:
return "TYPE_TRANSFER";
case C_TYPE_CONTROL:
return "C_TYPE_CONTROL";
case TYPE_CONNECT_READY:
return "TYPE_CONNECT_READY";
default:
return "UNKNOWN";
}
}
/**
* 客户端连接服务器,将会带有客户端id
*/
public static final byte C_TYPE_REGISTER = 0x01;
/**
* 心跳消息
*/
public static final byte TYPE_HEARTBEAT = 0x02;
/**
* 代理请求连接建立,需要在 http proxy server建立channel。该消息仅发生在nat server-> nat client
*/
public static final byte TYPE_CONNECT = 0x03;
/**
* 代理端主动关闭连接,将会关闭对应channel。该消息仅发生在 nat client -> nat server
*/
public static final byte TYPE_DISCONNECT = 0x04;
/**
* 数据传输,消息需要带流水号
*/
public static final byte TYPE_TRANSFER = 0x05;
/**
* 控制命令,目前没有使用。用来执行一些在手机本地执行的命令。如控制飞行模式切换,实现重播等
*/
public static final byte C_TYPE_CONTROL = 0x06;
/**
* 新的链接建立成功后,会给服务器回一个信息。预示着从客户端到真正的server之间的tcp链接真正建立完成
*/
public static final byte TYPE_CONNECT_READY = 0x07;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment