first commit

This commit is contained in:
Ray
2026-02-19 03:37:37 +08:00
commit ccfd8c79a4
2813 changed files with 453657 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Class-Path:

View File

@@ -0,0 +1,133 @@
//package project.redis;
//
//
//
//import java.io.InputStream;
//import java.io.UnsupportedEncodingException;
//import java.util.Properties;
//
//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
///**
// * 读取Properties综合类,默认绑定到classpath下的config.properties文件。
// */
//public class PropertiesUtilRedis {
// private static Log log = LogFactory.getLog(PropertiesUtilRedis.class);
// private static String CONFIG_FILENAME = "config/redis.properties";
// private static Properties prop = null;
//
// public PropertiesUtilRedis() {
// if (prop == null) {
// loadProperties();
// }
// };
//
// private synchronized static void loadProperties() {
// byte buff[]=null;
// try {
// //Open the props file
// InputStream is=PropertiesUtilRedis.class.getResourceAsStream("/" + CONFIG_FILENAME);
// prop = new Properties();
// //Read in the stored properties
// prop.load(is);
// }
// catch (Exception e) {
// System.err.println("读取配置文件失败!!!");
// prop = null;
// log.error(e.getMessage(), e);
// }
// }
//
// /**
// * 得到属性值
// * @param key
// * @return
// */
// public static String getProperty(String key) {
// if (prop == null) {
// loadProperties();
// }
//
// String value = prop.getProperty(key);
// if(value ==null){
// return null;
// }
// return value.trim();
// }
//
// /**
// * 得到内容包含汉字的属性值
// * @param key
// * @return
// */
// public static String getGBKProperty(String key) {
// String value = getProperty(key);
// try {
// value = new String(value.getBytes("ISO8859-1"),"GBK");
// } catch (UnsupportedEncodingException e) {
// }
//
// if(value ==null){
// return null;
// }
// return value.trim();
// }
//
// /**
// * 得到属性值,
// * @param key
// * @param defaultValue
// * @return
// */
// public static String getProperty(String key, String defaultValue) {
// if (prop == null) {
// loadProperties();
// }
//
// String value = prop.getProperty(key, defaultValue);
// if(value ==null){
// return null;
// }
// return value.trim();
// }
//
// /**
// * 得到内容包含汉字的属性值,如果不存在则使用默认值
// * @param key
// * @return
// */
// public static String getGBKProperty(String key, String defaultValue) {
// try {
// defaultValue = new String(defaultValue.getBytes("GBK"), "ISO8859-1");
// String value = getProperty(key, defaultValue);
// value = new String(value.getBytes("ISO8859-1"), "GBK");
//
// if (value == null) {
// return null;
// }
// return value.trim();
// } catch (UnsupportedEncodingException e) {
// return null;
// }
// }
//
// public static String getUTFProperty(String key, String defaultValue) {
// try {
// defaultValue = new String(defaultValue.getBytes("UTF-8"),
// "ISO8859-1");
// String value = getProperty(key, defaultValue);
// value = new String(value.getBytes("ISO8859-1"), "UTF-8");
//
// if (value == null) {
// return null;
// }
// return value.trim();
// } catch (UnsupportedEncodingException e) {
// return null;
// }
// }
//
// public static void main(String[] args) {
// System.out.println(PropertiesUtilRedis.getProperty("mail.username"));
// }
//}

View File

@@ -0,0 +1,128 @@
package project.redis;
import project.redis.interal.KeyValue;
import java.util.Map;
import java.util.Set;
public interface RedisHandler {
/*
* Object get set dell
*/
/**
* get
*
* @param key
*/
public Object get(String key);
String getString(String key);
/**
* 批量get与单个get存在性能区别一次连接redispool遍历取到数据后返回
*/
public Object[] getList(String[] keys);
/**
* set 同步
*
* @param key
* @param object
*/
public void setSync(String key, Object object);
/**
* set 批量同步
*
* @param params 需要写入的 k-v 数据
*/
public void setBatchSync(Map<String, Object> params);
/**
* set 异步
*
* @param key
* @param object
*/
public void setAsyn(String key, Object object);
public void remove(String key);
/*
* 队列Queue push poll
*/
/**
* push 同步
*
*/
public void pushSync(String key, Object object);
/**
* push 异步。批量处理在业务里考虑
*
*/
public void pushAsyn(String key, Object object);
/**
* 从队列尾取一个Object,如果为空则返回null。
*/
public Object poll(String key);
void setSyncLong(String key, long object);
void setSyncString(String key, String object);
/**
* set 同步 过期时间
*
* @param key
* @param object
* @param time 过期时间
*/
void setSyncStringEx(String key, String object, int time);
public void zadd(String key, double score, String member);
void zincrby(String key, double score, String member);
public Set<KeyValue<String, Double>> zRange(String key, double min, double max);
public Long zrem(String key, String member);
public Double zscore(String key, String member);
public void put(String key, String field, String value);
public String get(String key, String field);
public boolean remove(String key, String field);
/**
* redis 锁。
*/
boolean lock(String key,int seconds);
boolean exists(String key);
void incr(String key);
boolean setNx(String key, String value, int timeoutSeconds);
void expireKey(String key, int seconds);
/**
* 返回指定 key 还有多久的生存时间,单位为 秒
* 返回值 -1 代表永不过期, -2 代表 key 不存在
*
* @param key
* @return
*/
int ttl(String key);
public void sadd(String key, String member);
public void srem(String key, String member);
}

View File

@@ -0,0 +1,52 @@
package project.redis.interal;
import java.io.Serializable;
public class AsynItem implements Serializable {
private static final long serialVersionUID = -6863297417530461916L;
public final static String TYPE_MAP = "MAP_CACHE";
public final static String TYPE_QUEUE = "QUEUE_CACHE";
private String key;
private Object object;
private String type;
public AsynItem() {
}
public AsynItem(String key, Object object, String type) {
this.key = key;
this.object = object;
this.type = type;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Object getObject() {
return object;
}
public void setObject(Object object) {
this.object = object;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}

View File

@@ -0,0 +1,35 @@
package project.redis.interal;
import kernel.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
public class AsynItemQueue {
private static final Logger logger = LoggerFactory.getLogger(AsynItemQueue.class);
private static ConcurrentLinkedQueue<AsynItem> WORKING_EVENTS = new ConcurrentLinkedQueue<AsynItem>();
public static void add(AsynItem item) {
try {
WORKING_EVENTS.add(item);
} catch (Throwable e) {
logger.error("add(AsynItem item) fail : {}", JsonUtils.bean2Json(item), e);
}
}
public static int size() {
return WORKING_EVENTS.size();
}
public static AsynItem poll() {
AsynItem item = null;
try {
item = WORKING_EVENTS.poll();
} catch (Throwable e) {
logger.error("AsynItem poll() fail : ", e);
}
return item;
}
}

View File

@@ -0,0 +1,83 @@
package project.redis.interal;
import java.io.Serializable;
import java.util.Objects;
/**
* 键值对对象,只能在构造时传入键值
*
* @param <K> 键类型
* @param <V> 值类型
*/
public class KeyValue<K, V> implements Serializable {
private static final long serialVersionUID = 1L;
protected K key;
protected V value;
/**
* 构建{@code Pair}对象
*
* @param <K> 键类型
* @param <V> 值类型
* @param key 键
* @param value 值
* @return {@code Pair}
* @since 5.4.3
*/
public static <K, V> KeyValue<K, V> of(K key, V value) {
return new KeyValue<>(key, value);
}
/**
* 构造
*
* @param key 键
* @param value 值
*/
public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}
/**
* 获取键
*
* @return 键
*/
public K getKey() {
return this.key;
}
/**
* 获取值
*
* @return 值
*/
public V getValue() {
return this.value;
}
@Override
public String toString() {
return "KeyValue [key=" + key + ", value=" + value + "]";
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o instanceof cn.hutool.core.lang.Pair) {
cn.hutool.core.lang.Pair<?, ?> pair = (cn.hutool.core.lang.Pair<?, ?>) o;
return Objects.equals(getKey(), pair.getKey()) &&
Objects.equals(getValue(), pair.getValue());
}
return false;
}
@Override
public int hashCode() {
//copy from 1.8 HashMap.Node
return Objects.hashCode(key) ^ Objects.hashCode(value);
}
}

View File

@@ -0,0 +1,26 @@
package project.redis.interal;
import java.util.Date;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import project.log.SysLog;
import project.log.SysLogService;
public class OffLineEventRejectExecutingHandler implements RejectedExecutionHandler {
private SysLogService sysLogService;
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
SysLog entity = new SysLog();
entity.setLevel(SysLog.level_error);
entity.setCreateTime(new Date());
entity.setLog("RedisHandlerImpl处理线程池溢出数据被丢弃请调整线程参数。");
sysLogService.saveAsyn(entity);
}
public void setSysLogService(SysLogService sysLogService) {
this.sysLogService = sysLogService;
}
}

View File

@@ -0,0 +1,551 @@
package project.redis.interal;
import java.util.*;
import java.util.Map.Entry;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.StrUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.Tuple;
/**
* redis操作封装
*
*/
public class Redis implements InitializingBean, DisposableBean {
private Logger logger = LogManager.getLogger(this.getClass().getName());
private ShardedJedisPool jedisPool;
@Nullable
private String address;
@Nullable
private String password;
@Nullable
private String testOnBorrow;
@Nullable
private String testOnReturn;
@Nullable
private String testWhileIdle;
@Nullable
private String maxIdle;
@Nullable
private String minIdle;
@Nullable
private String maxActive;
@Nullable
private String maxWait;
@Nullable
private String timeout;
@Nullable
private String numTestsPerEvictionRun;
@Nullable
private String timeBetweenEvictionRunsMillis;
@Nullable
private String minEvictableIdleTimeMillis;
/*
* Object get set dell
*/
/**
* get
*
* @param key
*/
public Object get(String key) {
if (StringUtils.isBlank(key)) {
return null;
}
ShardedJedis jedis = jedisPool.getResource();
try {
String value = jedis.get(key);
if (value == null) {
return null;
}
return JSON.parse(value);
} finally {
jedisPool.returnResource(jedis);
}
}
public String getString(String key) {
if (StringUtils.isBlank(key)) {
return null;
}
ShardedJedis jedis = jedisPool.getResource();
try {
String value = jedis.get(key);
if (value == null) {
return null;
}
return value;
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* 批量get与单个get存在性能区别一次连接redispool遍历取到数据后返回
*/
public Object[] getList(String[] keys) {
ShardedJedis jedis = jedisPool.getResource();
try {
int length = keys.length;
Object[] resultObjects = new Object[length];
for (int i = 0; i < length; i++) {
String value = jedis.get(keys[i]);
resultObjects[i] = (value == null ? null : JSON.parse(value));
}
return resultObjects;
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* set 同步
*
* @param key
* @param object
*/
public void setSync(String key, Object object) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.set(key, JSON.toJSONString(object, SerializerFeature.WriteClassName));
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* set 批量同步
*
* @param params 需要写入的 k-v 数据
*/
public void setBatchSync(Map<String, Object> params) {
ShardedJedis jedis = jedisPool.getResource();
try {
Iterator<Map.Entry<String, Object>> iterator = params.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, Object> entry = iterator.next();
jedis.set(entry.getKey(), JSON.toJSONString(entry.getValue(), SerializerFeature.WriteClassName));
}
} finally {
jedisPool.returnResource(jedis);
}
}
public void remove(String key) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.del(key);
} finally {
jedisPool.returnResource(jedis);
}
}
/*
* 队列Queue push put
*/
/**
* push 同步
*
*/
public void pushSync(String key, Object object) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.lpush(key, JSON.toJSONString(object, SerializerFeature.WriteClassName));
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* push 批量同步
*
*/
public void pushBatchSync(List<Map<String, Object>> params) {
ShardedJedis jedis = jedisPool.getResource();
try {
for (Map<String, Object> map:params) {
for(Entry<String, Object> entry:map.entrySet()) {
jedis.lpush(entry.getKey(), JSON.toJSONString(entry.getValue(), SerializerFeature.WriteClassName));
}
}
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* 从队列尾取一个Object,如果为空则在timeout返回null。立即返回则timeout设置为0
*
*/
public Object poll(String key) {
ShardedJedis jedis = jedisPool.getResource();
try {
String value = jedis.rpop(key);
if (value == null || value.equals("nil")) {
return null;
}
return JSON.parse(value);
} finally {
jedisPool.returnResource(jedis);
}
}
public void setSyncLong(String key, long object) {
setSyncString(key,String.valueOf(object));
}
public void setSyncString(String key, String object) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.set(key, object);
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* 存储带过期时间的key
*
* @param key key
* @param object object
* @param time 过期时间
*/
public void setSyncStringEx(String key, String object, int time) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.setex(key, time, object);
} finally {
jedisPool.returnResource(jedis);
}
}
/**
* @author hetao
* @param key
* @param seconds
* @return
*/
public boolean lock(String key,int seconds) {
boolean result = false;
if(seconds<=0){
seconds=1;
}
ShardedJedis jedis = jedisPool.getResource();
if (jedis == null) {
return result;
}
try {
//当且仅当key不存在将key的值设置为value并且返回1若是给定的key已经存在则setnx不做任何动作返回0
Long setResult = jedis.setnx(key,String.valueOf(System.currentTimeMillis()));
if(null != setResult && setResult == 1L)
{
jedis.expire(key, seconds);
result = true;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
jedisPool.returnResource(jedis);
}
return result;
}
public boolean exists(String key) {
boolean result = false;
ShardedJedis jedis = jedisPool.getResource();
if (jedis == null) {
return result;
}
try {
result = jedis.exists(key);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
jedisPool.returnResource(jedis);
}
return result;
}
public void zadd(String key, double score, String member) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.zadd(key, score, member);
} finally {
jedisPool.returnResource(jedis);
}
}
public void zincrby(String key, double score, String member) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.zincrby(key, score, member);
} finally {
jedisPool.returnResource(jedis);
}
}
public Set<KeyValue<String, Double>> zRange(String key, double min, double max) {
ShardedJedis jedis = jedisPool.getResource();
try {
Set<Tuple> oriSet = jedis.zrangeByScoreWithScores(key, min, max);
Set<KeyValue<String, Double>> kvSets = new HashSet<>(oriSet.size());
for (Tuple oneTuple : oriSet) {
KeyValue<String, Double> oneItem = new KeyValue<String, Double>(oneTuple.getElement(), oneTuple.getScore());
kvSets.add(oneItem);
}
return kvSets;
} finally {
jedisPool.returnResource(jedis);
}
}
public Long zrem(String key, String member) {
ShardedJedis jedis = jedisPool.getResource();
try {
return jedis.zrem(key, member);
} finally {
jedisPool.returnResource(jedis);
}
}
public Double zscore(String key, String member) {
ShardedJedis jedis = jedisPool.getResource();
try {
return jedis.zscore(key, member);
} finally {
jedisPool.returnResource(jedis);
}
}
public void put(String key, String field, String value) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.hset(key, field, value);
} finally {
jedisPool.returnResource(jedis);
}
}
public String get(String key, String field) {
ShardedJedis jedis = jedisPool.getResource();
try {
return jedis.hget(key, field);
} finally {
jedisPool.returnResource(jedis);
}
}
public boolean remove(String key, String field) {
ShardedJedis jedis = jedisPool.getResource();
try {
return jedis.hdel(key, field) > 0;
} finally {
jedisPool.returnResource(jedis);
}
}
public boolean setNx(String key, String value) {
ShardedJedis jedis = jedisPool.getResource();
try {
// 当且仅当key不存在将key的值设置为value并且返回1若是给定的key已经存在则setnx不做任何动作返回0。
Long lockFlag = jedis.setnx(key, value);
if (lockFlag.intValue() == 0) {
return false;
}
return true;
} finally {
jedisPool.returnResource(jedis);
}
}
public boolean setNx(String key, String value, int expireSeconds) {
ShardedJedis jedis = jedisPool.getResource();
try {
// 当且仅当key不存在将key的值设置为value并且返回1若是给定的key已经存在则setnx不做任何动作返回0。
Long lockFlag = jedis.setnx(key, value);
if (lockFlag.intValue() == 0) {
return false;
}
jedis.expire(key, expireSeconds);
return true;
} finally {
jedisPool.returnResource(jedis);
}
}
public void expireKey(String key, int expireSeconds) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.expire(key, expireSeconds);
} finally {
jedisPool.returnResource(jedis);
}
}
public int ttl(String key) {
ShardedJedis jedis = jedisPool.getResource();
try {
Long leftSeconds = jedis.ttl(key);
if (leftSeconds == null) {
return 0;
}
return leftSeconds.intValue();
} finally {
jedisPool.returnResource(jedis);
}
}
@Override
public void afterPropertiesSet() throws Exception {
GenericObjectPool.Config config = new GenericObjectPool.Config();
config.testOnBorrow = Boolean.parseBoolean(testOnBorrow);
config.testOnReturn = Boolean.valueOf(testOnReturn);
config.testWhileIdle = Boolean.valueOf(testWhileIdle);
config.maxIdle = Integer.valueOf(maxIdle);
config.minIdle = Integer.valueOf(minIdle);
config.maxActive = Integer.valueOf(maxActive);
config.maxWait = Long.valueOf(maxWait);
config.numTestsPerEvictionRun = Integer
.valueOf(numTestsPerEvictionRun);
config.timeBetweenEvictionRunsMillis = Integer
.valueOf(timeBetweenEvictionRunsMillis);
config.minEvictableIdleTimeMillis = Integer
.valueOf(minEvictableIdleTimeMillis);
int timeout = Integer.valueOf(this.timeout);
List<JedisShardInfo> addressList = new ArrayList<JedisShardInfo>();
String[] address_arr = address.split(";");
if (ObjectUtils.isEmpty(address_arr)) {
logger.error("redis.address 不能为空! ");
return;
}
for (int i = 0; i < address_arr.length; i++) {
String[] address = address_arr[i].split(":");
if (address == null || address.length != 2) {
logger.error("redis.address 配置不正确!");
return;
}
String host = address[0];
int port = Integer.valueOf(address[1]);
logger.info("redis服务器" + (i + 1) + "的地址为: " + host + ":" + port);
// JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port, timeout);
// jedisShardInfo.setPassword("efwh23jekdhwdefe2");
// addressList.add(jedisShardInfo);
addressList.add(new JedisShardInfo(host, port, timeout));
}
jedisPool = new ShardedJedisPool(config, addressList);
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);//开启fastjson白名单
logger.info("redis对象池初始化完毕!");
}
public void sadd(String key, String member) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.sadd(key, member);
} finally {
jedisPool.returnResource(jedis);
}
}
public Long srem(String key, String member) {
ShardedJedis jedis = jedisPool.getResource();
try {
return jedis.srem(key, member);
} finally {
jedisPool.returnResource(jedis);
}
}
@Override
public void destroy() throws Exception {
jedisPool.destroy();
}
public void setAddress(@Nullable String address) {
this.address = address;
}
public void setTestOnBorrow(@Nullable String testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public void setTestOnReturn(@Nullable String testOnReturn) {
this.testOnReturn = testOnReturn;
}
public void setTestWhileIdle(@Nullable String testWhileIdle) {
this.testWhileIdle = testWhileIdle;
}
public void setMaxIdle(@Nullable String maxIdle) {
this.maxIdle = maxIdle;
}
public void setMinIdle(@Nullable String minIdle) {
this.minIdle = minIdle;
}
public void setMaxActive(@Nullable String maxActive) {
this.maxActive = maxActive;
}
public void setMaxWait(@Nullable String maxWait) {
this.maxWait = maxWait;
}
public void setTimeout(@Nullable String timeout) {
this.timeout = timeout;
}
public void setNumTestsPerEvictionRun(@Nullable String numTestsPerEvictionRun) {
this.numTestsPerEvictionRun = numTestsPerEvictionRun;
}
public void setTimeBetweenEvictionRunsMillis(@Nullable String timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public void setMinEvictableIdleTimeMillis(@Nullable String minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
public void incr(String key) {
ShardedJedis jedis = jedisPool.getResource();
try {
jedis.incr(key);
} finally {
jedisPool.returnResource(jedis);
}
}
}

View File

@@ -0,0 +1,285 @@
package project.redis.interal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskExecutor;
import kernel.util.ThreadUtils;
import project.redis.RedisHandler;
import redis.clients.jedis.ShardedJedis;
public class RedisHandlerImpl implements RedisHandler, InitializingBean, Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private Redis redis;
private TaskExecutor taskExecutor;
/*
* Object get set dell
*/
/**
* get
*
* @param key
*/
public Object get(String key) {
return redis.get(key);
}
@Override
public String getString(String key) {
return redis.getString(key);
}
/**
* 批量get与单个get存在性能区别一次连接redispool遍历取到数据后返回
*/
public Object[] getList(String[] keys) {
return redis.getList(keys);
}
/**
* set 同步
*
* @param key
* @param object
*/
public void setSync(String key, Object object) {
redis.setSync(key, object);
}
/**
* set 批量同步
*
* @param params 需要写入的 k-v 数据
*/
public void setBatchSync(Map<String, Object> params) {
redis.setBatchSync(params);
}
/**
* set 异步
*
* @param key
* @param object
*/
public void setAsyn(String key, Object object) {
AsynItem item = new AsynItem(key, object, AsynItem.TYPE_MAP);
AsynItemQueue.add(item);
}
public void remove(String key) {
redis.remove(key);
}
/*
* 队列Queue push put
*/
/**
* push 同步
*
*/
public void pushSync(String key, Object object) {
redis.pushSync(key, object);
}
/**
* push 异步。批量处理在业务里考虑
*
*/
public void pushAsyn(String key, Object object) {
AsynItem item = new AsynItem(key, object, AsynItem.TYPE_QUEUE);
AsynItemQueue.add(item);
}
/**
* push 异步。批量处理在业务里考虑
*
*/
public void pushBatchAsyn(List<Map<String, Object>> params) {
redis.pushBatchSync(params);
}
/**
* 从队列尾取一个Object
*
*/
public Object poll(String key) {
return redis.poll(key);
}
@Override
public void setSyncLong(String key, long object) {
redis.setSyncLong(key,object);
}
@Override
public void setSyncString(String key, String object) {
redis.setSyncString(key,object);
}
@Override
public void setSyncStringEx(String key, String object, int time){
redis.setSyncStringEx(key, object, time);
}
public void zadd(String key, double score, String member) {
redis.zadd(key, score, member);
}
public void zincrby(String key, double score, String member) {
redis.zincrby(key, score, member);
}
public Set<KeyValue<String, Double>> zRange(String key, double min, double max) {
return redis.zRange(key, min, max);
}
public Long zrem(String key, String member) {
return redis.zrem(key, member);
}
public Double zscore(String key, String member) {
return redis.zscore(key, member);
}
public void put(String key, String field, String value) {
redis.put(key, field, value);
}
public String get(String key, String field) {
return redis.get(key, field);
}
public boolean remove(String key, String field) {
return redis.remove(key, field);
}
@Override
public boolean lock(String key, int seconds) {
return redis.lock(key,seconds);
}
@Override
public boolean exists(String key) {
return redis.exists(key);
}
@Override
public void incr(String key) {
redis.incr(key);
}
@Override
public boolean setNx(String key, String value, int timeoutSeconds) {
return redis.setNx(key, value, timeoutSeconds);
}
@Override
public void expireKey(String key, int seconds) {
redis.expireKey(key, seconds);
}
@Override
public int ttl(String key) {
return redis.ttl(key);
}
@Override
public void sadd(String key, String member) {
redis.sadd(key, member);
}
@Override
public void srem(String key, String member) {
redis.srem(key, member);
}
/**
* 服务运行: 1. 从消息队列获取message 2.调用currentProvider发送短信
*/
public void run() {
List<AsynItem> list = new ArrayList<AsynItem>();
while (true) {
try {
AsynItem item = AsynItemQueue.poll();
if (item != null) {
list.add(item);
}
if ((item == null && list.size() > 0) || list.size() >= 100) {
taskExecutor.execute(new HandleRunner(list));
list = new ArrayList<AsynItem>();
}
if (item == null) {
ThreadUtils.sleep(50);
}
} catch (Throwable e) {
logger.error("RedisHandlerImpl taskExecutor.execute() fail", e);
}
}
}
public class HandleRunner implements Runnable {
private List<AsynItem> list;
public HandleRunner(List<AsynItem> list) {
this.list = list;
}
public void run() {
try {
Map<String, Object> params_map = new ConcurrentHashMap<String, Object>();
List<Map<String, Object>> params_queue = new ArrayList<Map<String, Object>>();
for (int i = 0; i < list.size(); i++) {
AsynItem item = list.get(i);
if (AsynItem.TYPE_MAP.equals(item.getType())) {
params_map.put(item.getKey(), item.getObject());
} else if (AsynItem.TYPE_QUEUE.equals(item.getType())) {
Map<String, Object> map = new HashMap<String, Object>();
map.put(item.getKey(), item.getObject());
params_queue.add(map);
}
}
if (params_map.size() > 0) {
redis.setBatchSync(params_map);
}
if (params_queue.size() > 0) {
redis.pushBatchSync(params_queue);
}
} catch (Throwable t) {
logger.error("RedisHandlerImpl taskExecutor.execute() fail", t);
}
}
}
public void afterPropertiesSet() throws Exception {
new Thread(this, "RedisHandlerImplServer").start();
if (logger.isInfoEnabled()) {
logger.info("启动Redis(RedisHandlerImplServer)服务!");
}
}
public void setRedis(Redis redis) {
this.redis = redis;
}
public Redis getRedis() {
return redis;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}

View File

@@ -0,0 +1,41 @@
package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisDataException;
public class Response<T> {
protected T response = null;
private boolean built = false;
private boolean set = false;
private Builder<T> builder;
private Object data;
public Response(Builder<T> b) {
this.builder = b;
}
public void set(Object data) {
this.data = data;
set = true;
}
public T get() {
if (!set) {
throw new JedisDataException(
"Please close pipeline or multi block before calling this method.");
}
if (!built) {
// 数据为空是直接返回null避免 builder.build(data)产生空指针异常
if(data == null) {
return null;
}
response = builder.build(data);
this.data = null;
built = true;
}
return response;
}
public String toString() {
return "Response " + builder.toString();
}
}