Java GenericObjectPool 对象池化技术--SpringBoot sftp 连接池工具类
一个对象池包含一组已经初始化过且可以使用的对象,而可以在有需求时创建和销毁对象。池的用户可以从池子中取得对象,对其进行操作处理,并在不需要时归还给池子而非直接销毁它。这是一种特殊的工厂对象。 点击查看 MQTT(EMQX) - SpringBoot 创建 mqtt 连接池 点击查看 MQTT(EMQX) - SpringBoot 整合MQTT Demo - 附源代码 BasePooledObjectFactory 对象池化技术 的使用
Pom.xml
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.7.0</version></dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.7.0</version>
</dependency>
MqttConnection.java
package com.vipsoft.mqtt.pool; public class MqttConnection { private String mqttClient; ; public MqttConnection(String mqttClient) { this.mqttClient = mqttClient; } public String getMqttClient() { return mqttClient; } public void setMqttClient(String mqttClient) { this.mqttClient = mqttClient; } /** * 推送方法消息 */ public void publish(String msg) throws Exception { System.out.println("对象" + mqttClient + ":" + "执行任务" + msg); } @Override public String toString() { return "MqttConnection{" + "id=" + mqttClient + '}'; }}
package com.vipsoft.mqtt.pool;
public class MqttConnection {
private String mqttClient;
;
public MqttConnection(String mqttClient) {
this.mqttClient = mqttClient;
}
public String getMqttClient() {
return mqttClient;
public void setMqttClient(String mqttClient) {
/**
* 推送方法消息
*/
public void publish(String msg) throws Exception {
System.out.println("对象" + mqttClient + ":" + "执行任务" + msg);
@Override
public String toString() {
return "MqttConnection{" + "id=" + mqttClient + '}';
MqttConnectionFactory.java
package com.vipsoft.mqtt.pool;import org.apache.commons.pool2.BasePooledObjectFactory;import org.apache.commons.pool2.PooledObject;import org.apache.commons.pool2.impl.DefaultPooledObject;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.atomic.AtomicInteger;public class MqttConnectionFactory extends BasePooledObjectFactory<MqttConnection> { private static final Logger logger = LoggerFactory.getLogger(MqttConnectionFactory.class); // AtomicInteger是一个提供原子操作的Integer类,通过线程安全的方式操作加减 private AtomicInteger counter = new AtomicInteger(); /** * 在对象池中创建对象 * * @return * @throws Exception */ @Override public MqttConnection create() throws Exception { // 实现线程安全避免在高并发的场景下出现clientId重复导致无法创建连接的情况 int count = this.counter.addAndGet(1); MqttConnection mqttConnection = new MqttConnection("MqttConnection:" + count); logger.info("在对象池中创建对象 {}", mqttConnection.toString()); return mqttConnection; } /** * common-pool2 中创建了 DefaultPooledObject 对象对对象池中对象进行的包装。 * 将我们自定义的对象放置到这个包装中,工具会统计对象的状态、创建时间、更新时间、返回时间、出借时间、使用时间等等信息进行统计 * * @param mqttConnection * @return */ @Override public PooledObject<MqttConnection> wrap(MqttConnection mqttConnection) { logger.info("封装默认返回类型 {}", mqttConnection.toString()); return new DefaultPooledObject<>(mqttConnection); } /** * 销毁对象 * * @param p 对象池 * @throws Exception 异常 */ @Override public void destroyObject(PooledObject<MqttConnection> p) throws Exception { logger.info("销毁对象 {}", p.getObject().getMqttClient()); super.destroyObject(p); } /** * 校验对象是否可用 * * @param p 对象池 * @return 对象是否可用结果,boolean */ @Override public boolean validateObject(PooledObject<MqttConnection> p) { logger.info("校验对象是否可用 {}", p.getObject().getMqttClient()); return super.validateObject(p); } /** * 激活钝化的对象系列操作 * * @param p 对象池 * @throws Exception 异常信息 */ @Override public void activateObject(PooledObject<MqttConnection> p) throws Exception { logger.info("激活钝化的对象 {}", p.getObject().getMqttClient()); super.activateObject(p); } /** * 钝化未使用的对象 * * @param p 对象池 * @throws Exception 异常信息 */ @Override public void passivateObject(PooledObject<MqttConnection> p) throws Exception { logger.info("钝化未使用的对象 {}", p.getObject().getMqttClient()); super.passivateObject(p); }}
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class MqttConnectionFactory extends BasePooledObjectFactory<MqttConnection> {
private static final Logger logger = LoggerFactory.getLogger(MqttConnectionFactory.class);
// AtomicInteger是一个提供原子操作的Integer类,通过线程安全的方式操作加减
private AtomicInteger counter = new AtomicInteger();
* 在对象池中创建对象
*
* @return
* @throws Exception
public MqttConnection create() throws Exception {
// 实现线程安全避免在高并发的场景下出现clientId重复导致无法创建连接的情况
int count = this.counter.addAndGet(1);
MqttConnection mqttConnection = new MqttConnection("MqttConnection:" + count);
logger.info("在对象池中创建对象 {}", mqttConnection.toString());
return mqttConnection;
* common-pool2 中创建了 DefaultPooledObject 对象对对象池中对象进行的包装。
* 将我们自定义的对象放置到这个包装中,工具会统计对象的状态、创建时间、更新时间、返回时间、出借时间、使用时间等等信息进行统计
* @param mqttConnection
public PooledObject<MqttConnection> wrap(MqttConnection mqttConnection) {
logger.info("封装默认返回类型 {}", mqttConnection.toString());
return new DefaultPooledObject<>(mqttConnection);
* 销毁对象
* @param p 对象池
* @throws Exception 异常
public void destroyObject(PooledObject<MqttConnection> p) throws Exception {
logger.info("销毁对象 {}", p.getObject().getMqttClient());
super.destroyObject(p);
* 校验对象是否可用
* @return 对象是否可用结果,boolean
public boolean validateObject(PooledObject<MqttConnection> p) {
logger.info("校验对象是否可用 {}", p.getObject().getMqttClient());
return super.validateObject(p);
* 激活钝化的对象系列操作
* @throws Exception 异常信息
public void activateObject(PooledObject<MqttConnection> p) throws Exception {
logger.info("激活钝化的对象 {}", p.getObject().getMqttClient());
super.activateObject(p);
* 钝化未使用的对象
public void passivateObject(PooledObject<MqttConnection> p) throws Exception {
logger.info("钝化未使用的对象 {}", p.getObject().getMqttClient());
super.passivateObject(p);
PoolTest.java
package com.vipsoft.mqtt;import cn.hutool.core.date.DateUtil;import com.vipsoft.mqtt.pool.MqttConnection;import com.vipsoft.mqtt.pool.MqttConnectionFactory;import org.apache.commons.pool2.impl.GenericObjectPool;import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicInteger;@SpringBootTestpublic class PoolTest { @Test void basePooledTest() throws InterruptedException { AtomicInteger atomicInteger = new AtomicInteger(); int excutorCount = 15; CountDownLatch countDownLatch = new CountDownLatch(excutorCount); // =====================创建线程池===================== ExecutorService excutor = Executors.newFixedThreadPool(5); // =====================创建对象池===================== // 对象池工厂 MqttConnectionFactory personPoolFactory = new MqttConnectionFactory(); // 对象池配置 GenericObjectPoolConfig<MqttConnection> objectPoolConfig = new GenericObjectPoolConfig<>(); objectPoolConfig.setMaxTotal(50); // 对象池 GenericObjectPool<MqttConnection> mqttPool = new GenericObjectPool<>(personPoolFactory, objectPoolConfig); // =====================测试对象池===================== // 循环100次,从线程池中取多个多线程执行任务,来测试对象池 for (int i = 0; i < excutorCount; i++) { excutor.submit(new Thread(() -> { // 模拟从对象池取出对象,执行任务 MqttConnection mqtt = null; try { // 从对象池取出对象 mqtt = mqttPool.borrowObject(); // 让对象工作 int count = atomicInteger.addAndGet(1); mqtt.publish("Id:" + count + " Time: " + DateUtil.now()); } catch (Exception e) { e.printStackTrace(); } finally { // 回收对象到对象池 if (mqtt != null) { mqttPool.returnObject(mqtt); } countDownLatch.countDown(); } })); } countDownLatch.await(); }}
package com.vipsoft.mqtt;
import cn.hutool.core.date.DateUtil;
import com.vipsoft.mqtt.pool.MqttConnection;
import com.vipsoft.mqtt.pool.MqttConnectionFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@SpringBootTest
public class PoolTest {
@Test
void basePooledTest() throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger();
int excutorCount = 15;
CountDownLatch countDownLatch = new CountDownLatch(excutorCount);
// =====================创建线程池=====================
ExecutorService excutor = Executors.newFixedThreadPool(5);
// =====================创建对象池=====================
// 对象池工厂
MqttConnectionFactory personPoolFactory = new MqttConnectionFactory();
// 对象池配置
GenericObjectPoolConfig<MqttConnection> objectPoolConfig = new GenericObjectPoolConfig<>();
objectPoolConfig.setMaxTotal(50);
// 对象池
GenericObjectPool<MqttConnection> mqttPool = new GenericObjectPool<>(personPoolFactory, objectPoolConfig);
// =====================测试对象池=====================
// 循环100次,从线程池中取多个多线程执行任务,来测试对象池
for (int i = 0; i < excutorCount; i++) {
excutor.submit(new Thread(() -> {
// 模拟从对象池取出对象,执行任务
MqttConnection mqtt = null;
try {
// 从对象池取出对象
mqtt = mqttPool.borrowObject();
// 让对象工作
int count = atomicInteger.addAndGet(1);
mqtt.publish("Id:" + count + " Time: " + DateUtil.now());
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收对象到对象池
if (mqtt != null) {
mqttPool.returnObject(mqtt);
countDownLatch.countDown();
}));
countDownLatch.await();
原文链接:https://www.cnblogs.com/vipsoft/p/17270006.html
本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728