Commit a0512be4 authored by Administrator's avatar Administrator

报文过大,可能导致解码失败,此时使用分包策略

parent 27316b2a
......@@ -128,24 +128,27 @@ public class HttpProxyService extends Service {
}
});
Log.i("weijia", "start G4Proxy front service");
Launcher.startHttpProxyService(3128);
Log.i("weijia", "start private network forward task");
String clientKey = Settings.System.getString(getContentResolver(), Settings.Secure.ANDROID_ID);
ProxyClient.start(Constants.g4ProxyServerHost_1, Constants.g4ProxyServerPort, clientKey);
ProxyClient.start(Constants.g4ProxyServerHost_2, Constants.g4ProxyServerPort, clientKey);
Log.i("weijia", "start G4Proxy front service");
Launcher.startHttpProxyService(3128);
// Log.i("weijia", "start private network forward task");
//
// ProxyClient.start(Constants.g4ProxyServerHost_1, Constants.g4ProxyServerPort, clientKey);
//
// ProxyClient.start(Constants.g4ProxyServerHost_2, Constants.g4ProxyServerPort, clientKey);
//新的代理服务器实现
//默认使用3128,为了不影响之前的代码,先设置为3129
LittelProxyBootstrap.setLittelServerPort(3129);
G4ProxyClient g4ProxyClient = new G4ProxyClient(Constants.g4ProxyServerHost_1, Constants.g4ProxyServerPort, clientKey);
G4ProxyClient g4ProxyClient = new G4ProxyClient("www.scumall.com", 50000, clientKey);
g4ProxyClient.startup();
G4ProxyClient g4ProxyClient2 = new G4ProxyClient(Constants.g4ProxyServerHost_2, Constants.g4ProxyServerPort, clientKey);
g4ProxyClient2.startup();
// G4ProxyClient g4ProxyClient2 = new G4ProxyClient(Constants.g4ProxyServerHost_2, Constants.g4ProxyServerPort, clientKey);
// g4ProxyClient2.startup();
}
}
......@@ -57,7 +57,7 @@ public class G4ProxyClient {
}
});
Bootstrap join2NatServerBootStrap = new Bootstrap();
join2NatServerBootStrap = new Bootstrap();
join2NatServerBootStrap.group(workerGroup);
join2NatServerBootStrap.channel(NioSocketChannel.class);
join2NatServerBootStrap.handler(new ChannelInitializer<SocketChannel>() {
......@@ -70,6 +70,8 @@ public class G4ProxyClient {
ch.pipeline().addLast(new NatClientChannelHandler(G4ProxyClient.this));
}
});
forceReconnect();
}
private volatile boolean isConnecting = false;
......
......@@ -34,16 +34,36 @@ public class LittleProxyChannelHandler extends SimpleChannelInboundHandler<ByteB
channel.close();
return;
}
NatMessage natMessage = new NatMessage();
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(NatMessage.P_TYPE_TRANSFER);
natChannel.writeAndFlush(natMessage);
log.info("receive data from littel proxy");
// 这里需要处理分包的问题,文件下载场景,这个包可能非常大。会导致 io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1048576: 103335954 - discarded
int maxPackSize = Constant.MAX_FRAME_LENGTH - 128;
while (msg.readableBytes() > maxPackSize) {
byte[] bytes = new byte[maxPackSize];
msg.readBytes(bytes);
NatMessage natMessage = new NatMessage();
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(NatMessage.P_TYPE_TRANSFER);
//write,但是不需要 flush
natChannel.write(natMessage);
log.info("receive data from littel proxy");
}
if (msg.readableBytes() > 0) {
NatMessage natMessage = new NatMessage();
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(NatMessage.P_TYPE_TRANSFER);
natChannel.writeAndFlush(natMessage);
log.info("receive data from littel proxy");
} else {
natChannel.flush();
}
}
......
......@@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
public class Constant {
//单个报文,最大1M,超过1M需要分段
public static final int MAX_FRAME_LENGTH = 1024 * 1024;
public static final int LENGTH_FIELD_OFFSET = 0;
......
......@@ -174,4 +174,11 @@ public class NatServerChannelHandler extends SimpleChannelInboundHandler<NatMess
clientManager.closeClient(clientId);
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//TODO 这里应该怎么办???
super.exceptionCaught(ctx, cause);
}
}
......@@ -75,15 +75,33 @@ public class UserMappingChannelHandler extends SimpleChannelInboundHandler<ByteB
return;
}
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
NatMessage natMessage = new NatMessage();
natMessage.setType(NatMessage.P_TYPE_TRANSFER);
natMessage.setSerialNumber(seq);
natMessage.setData(bytes);
client.getNatChannel().writeAndFlush(natMessage);
log.info("forward data to client:{}", clientId);
int maxPackSize = Constant.MAX_FRAME_LENGTH - 128;
while (msg.readableBytes() > maxPackSize) {
byte[] bytes = new byte[maxPackSize];
msg.readBytes(bytes);
NatMessage natMessage = new NatMessage();
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(NatMessage.P_TYPE_TRANSFER);
//write,但是不需要 flush
client.getNatChannel().write(natMessage);
log.info("receive data from littel proxy");
}
if (msg.readableBytes() > 0) {
NatMessage natMessage = new NatMessage();
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
natMessage.setData(bytes);
natMessage.setSerialNumber(seq);
natMessage.setType(NatMessage.P_TYPE_TRANSFER);
client.getNatChannel().writeAndFlush(natMessage);
log.info("receive data from littel proxy");
} else {
client.getNatChannel().flush();
}
}
......
......@@ -27,28 +27,32 @@ public class ClientManager {
public boolean registerNewClient(String client, Channel channel) {
NatClientImage natClientImage = natClientImageMap.get(client);
Integer mappingPort;
if (natClientImage != null) {
natClientImage.updateChannel(channel);
return true;
log.warn("update nat channel");
natClientImage.getNatChannel().close();
natClientImage.closeAllUserChannel();
mappingPort = natClientImage.getMappingPort();
} else {
mappingPort = AvailablePortResManager.allocate();
if (mappingPort == null) {
log.error("failed to allocate port");
return false;
}
ChannelFuture channelFuture = userMappingBootstrap.bind(mappingPort);
try {
channelFuture.get();
} catch (Exception e) {
log.error("wait for port binding error", e);
return false;
}
if (!channelFuture.isSuccess()) {
log.warn("bind mapping port:{} failed", mappingPort, channelFuture.cause());
return false;
}
}
Integer mappingPort = AvailablePortResManager.allocate();
if (mappingPort == null) {
log.error("failed to allocate port");
return false;
}
ChannelFuture channelFuture = userMappingBootstrap.bind(mappingPort);
try {
channelFuture.get();
} catch (Exception e) {
log.error("wait for port binding error", e);
return false;
}
if (!channelFuture.isSuccess()) {
log.warn("bind mapping port:{} failed", mappingPort, channelFuture.cause());
return false;
}
channel.attr(Constant.NAT_CHANNEL_CLIENT_KEY).set(client);
clientPortBiMap.put(client, mappingPort);
natClientImage = new NatClientImage(client, mappingPort, channel);
......@@ -62,7 +66,11 @@ public class ClientManager {
}
public NatClientImage getClient(Integer mappingPort) {
return natClientImageMap.get(clientPortBiMap.inverse().get(mappingPort));
String clientId = clientPortBiMap.inverse().get(mappingPort);
if (clientId == null) {
return null;
}
return natClientImageMap.get(clientId);
}
......
......@@ -15,6 +15,7 @@ public class NatClientImage {
@Getter
private String clientId;
private AtomicLong seqGenerator = new AtomicLong(0);
@Getter
private int mappingPort;
@Getter
private Channel natChannel;
......@@ -67,5 +68,6 @@ public class NatClientImage {
for (Channel channel : userMappingChannels.values()) {
releaseConnection(channel);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment