Commit a75f3e07 authored by Administrator's avatar Administrator

update

parent f130feff
......@@ -147,7 +147,6 @@ public class HttpProxyService extends Service {
//新的代理服务器实现
LogbackConfig.config();
//默认使用3128,为了不影响之前的代码,先设置为3129
......
......@@ -14,6 +14,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
......@@ -45,7 +46,7 @@ public class G4ProxyClient {
LittelProxyBootstrap.makeSureLittelProxyStartup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("littel-proxy-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class)));
join2LittleProxyBootStrap = new Bootstrap();
join2LittleProxyBootStrap.group(workerGroup);
join2LittleProxyBootStrap.channel(NioSocketChannel.class);
......@@ -57,6 +58,7 @@ public class G4ProxyClient {
}
});
workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("nat-endpoint-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class)));
join2NatServerBootStrap = new Bootstrap();
join2NatServerBootStrap.group(workerGroup);
join2NatServerBootStrap.channel(NioSocketChannel.class);
......
package com.virjar.g4proxy_new.protocol;
import java.util.Map;
import io.netty.channel.Channel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.AttributeKey;
public class Constant {
......@@ -31,4 +34,6 @@ public class Constant {
public static AttributeKey<Channel> NEXT_CHANNEL = AttributeKey.newInstance("next_channel");
public static AttributeKey<Map<Long, Channel>> userMappingChannelForNatChannel = AttributeKey.newInstance("userMappingChannelForNatChannel");
}
......@@ -7,11 +7,11 @@ import com.virjar.g4proxy_new.server.client.ClientManager;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
......@@ -43,8 +43,8 @@ public class G4ProxyServer {
private void startUpInternal() {
ServerBootstrap natServerBootStrap = new ServerBootstrap();
NioEventLoopGroup serverBossGroup = new NioEventLoopGroup();
NioEventLoopGroup serverWorkerGroup = new NioEventLoopGroup();
NioEventLoopGroup serverBossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("natServer-boss-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class)));
NioEventLoopGroup serverWorkerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("natServer-worker-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class)));
natServerBootStrap.group(serverBossGroup, serverWorkerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
......@@ -66,8 +66,8 @@ public class G4ProxyServer {
});
ServerBootstrap userMapServerBootStrap = new ServerBootstrap();
serverBossGroup = new NioEventLoopGroup();
serverWorkerGroup = new NioEventLoopGroup();
serverBossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("userMapping-boss-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class)));
serverWorkerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("userMapping-worker-group" + DefaultThreadFactory.toPoolName(NioEventLoopGroup.class)));
userMapServerBootStrap.group(serverBossGroup, serverWorkerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
......
......@@ -5,6 +5,8 @@ import com.virjar.g4proxy_new.protocol.NatMessage;
import com.virjar.g4proxy_new.server.client.ClientManager;
import com.virjar.g4proxy_new.server.client.NatClientImage;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
......@@ -184,8 +186,21 @@ public class NatServerChannelHandler extends SimpleChannelInboundHandler<NatMess
ctx.close();
return;
}
log.info("the nat channel InActive ,close ");
clientManager.closeClient(clientId);
Map<Long, Channel> userMappingChannel = natChannel.attr(Constant.userMappingChannelForNatChannel).get();
for (Channel channel : userMappingChannel.values()) {
if (channel.isActive()) {
channel.close();
}
}
NatClientImage client = clientManager.getClient(clientId);
Channel natChannel1 = client.getNatChannel();
if (natChannel == natChannel1) {
log.info("the nat channel InActive ,close ");
client.getUserMappingServerChannel().close();
}
// log.info("the nat channel InActive ,close ");
// clientManager.closeClient(clientId);
super.channelInactive(ctx);
}
......
......@@ -18,6 +18,7 @@ import lombok.extern.slf4j.Slf4j;
public class ClientManager {
private Map<String, NatClientImage> natClientImageMap = new ConcurrentHashMap<>();
private BiMap<String, Integer> clientPortBiMap = HashBiMap.create();
private Map<Integer, Channel> userMappingServerChannels = new ConcurrentHashMap<>();
@Setter
private ServerBootstrap userMappingBootstrap;
......@@ -36,6 +37,24 @@ public class ClientManager {
//需要关闭所有的连接,但是计数器不能清零。计数器清零可能导致紊乱
natClientImage.updateChannel(natChannel);
Channel userMappingServerChannel = userMappingServerChannels.get(mappingPort);
if (userMappingServerChannel == null || !userMappingServerChannel.isActive()) {
ChannelFuture channelFuture = userMappingBootstrap.bind(mappingPort);
try {
channelFuture.get();
} catch (Exception e) {
log.error("wait for port binding error", e);
return false;
}
if (!channelFuture.isSuccess()) {
log.warn("bind mapping port:{} failed", mappingPort, channelFuture.cause());
return false;
}
userMappingServerChannels.put(mappingPort, channelFuture.channel());
natClientImage.setUserMappingServerChannel(channelFuture.channel());
}
} else {
// retry with useful port
......@@ -56,6 +75,7 @@ public class ClientManager {
log.warn("bind mapping port:{} failed", mappingPort, channelFuture.cause());
continue;
}
userMappingServerChannels.put(mappingPort, channelFuture.channel());
natClientImage = new NatClientImage(client, mappingPort, natChannel, channelFuture.channel());
break;
}
......@@ -87,16 +107,16 @@ public class ClientManager {
}
public void closeClient(String clientId) {
log.info("close client :{}", clientId);
NatClientImage client = natClientImageMap.remove(clientId);
if (client == null) {
log.error("no client registered for clientId:{}", clientId);
return;
}
client.closeAllUserChannel();
client.getNatChannel().close();
//这样,停止监听对应的代理端口,防止代理请求再次打进来
client.getUserMappingServerChannel().close();
}
// public void closeClient(String clientId) {
// log.info("close client :{}", clientId);
// NatClientImage client = natClientImageMap.remove(clientId);
// if (client == null) {
// log.error("no client registered for clientId:{}", clientId);
// return;
// }
// client.closeAllUserChannel();
// client.getNatChannel().close();
// //这样,停止监听对应的代理端口,防止代理请求再次打进来
// client.getUserMappingServerChannel().close();
// }
}
......@@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.Channel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
......@@ -21,15 +22,17 @@ public class NatClientImage {
private Channel natChannel;
@Getter
@Setter
private Channel userMappingServerChannel;
private Map<Long, Channel> userMappingChannels = Maps.newConcurrentMap();
// private Map<Long, Channel> userMappingChannels = Maps.newConcurrentMap();
public NatClientImage(String clientId, int mappingPort, Channel natChannel, Channel userMappingServerChannel) {
this.clientId = clientId;
this.mappingPort = mappingPort;
this.natChannel = natChannel;
this.userMappingServerChannel = userMappingServerChannel;
this.natChannel.attr(Constant.userMappingChannelForNatChannel).set(Maps.<Long, Channel>newConcurrentMap());
}
public void updateChannel(Channel channel) {
......@@ -38,12 +41,12 @@ public class NatClientImage {
}
public Channel queryUserMappingChannel(Long seq) {
return userMappingChannels.get(seq);
return natChannel.attr(Constant.userMappingChannelForNatChannel).get().get(seq);
}
public Long onNewConnection(Channel userMappingChannel) {
long seq = seqGenerator.incrementAndGet();
userMappingChannels.put(seq, userMappingChannel);
natChannel.attr(Constant.userMappingChannelForNatChannel).get().put(seq, userMappingChannel);
userMappingChannel.attr(Constant.SERIAL_NUM).set(seq);
userMappingChannel.attr(Constant.USER_MAPPING_CHANNEL_PORT).set(mappingPort);
......@@ -61,7 +64,7 @@ public class NatClientImage {
channel.attr(Constant.USER_MAPPING_CHANNEL_PORT).set(null);
channel.attr(Constant.USER_MAPPING_CLIENT).set(null);
userMappingChannels.remove(seq);
natChannel.attr(Constant.userMappingChannelForNatChannel).get().remove(seq);
if (channel.isActive()) {
channel.close();
......@@ -69,7 +72,7 @@ public class NatClientImage {
}
public void closeAllUserChannel() {
for (Channel channel : userMappingChannels.values()) {
for (Channel channel : natChannel.attr(Constant.userMappingChannelForNatChannel).get().values()) {
releaseConnection(channel);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment