-
redisTemplate.opsForValue().get(key);
-
public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {
AbstractOperations(RedisTemplate<K, V> template) {
this.template = template;
}
DefaultValueOperations(RedisTemplate<K, V> template) {
super(template);
}
private final ValueOperations<K, V> valueOps = new DefaultValueOperations<>(this);
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#opsForValue()
*/
@Override
public ValueOperations<K, V> opsForValue() {
return valueOps;
}
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection) {
return execute(action, exposeConnection, false);
}
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getRequiredConnectionFactory();
RedisConnection conn = null;
try {
if (enableTransactionSupport) {
// only bind resources in case of potential transaction synchronization
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
T result = action.doInRedis(connToExpose);
// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}
// TODO: any other connection processing?
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
}
}
}
-
class DefaultValueOperations<K, V> extends AbstractOperations<K, V> implements ValueOperations<K, V> {
DefaultValueOperations(RedisTemplate<K, V> template) {
super(template);
}
@Nullable
<T> T execute(RedisCallback<T> callback, boolean exposeConnection) {
return template.execute(callback, exposeConnection);
}
/*
* redis get 命令
*/
@Override
public V get(Object key) {
return execute(new ValueDeserializingRedisCallback(key) {
@Override
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
return connection.get(rawKey);
}
}, true);
}
}
-
public abstract class RedisConnectionUtils {
public static RedisConnection getConnection(RedisConnectionFactory factory) {
return getConnection(factory, false);
}
public static RedisConnection getConnection(RedisConnectionFactory factory, boolean transactionSupport) {
return doGetConnection(factory, true, false, transactionSupport);
}
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean transactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (connHolder != null) {
if (transactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
if (!allowCreate) {
throw new IllegalArgumentException("No connection found and allowCreate = false");
}
if (log.isDebugEnabled()) {
log.debug("Opening RedisConnection");
}
//获取 RedisConnection
RedisConnection conn = factory.getConnection();
if (bind) {
RedisConnection connectionToBind = conn;
if (transactionSupport && isActualNonReadonlyTransactionActive()) {
connectionToBind = createConnectionProxy(conn, factory);
}
connHolder = new RedisConnectionHolder(connectionToBind);
TransactionSynchronizationManager.bindResource(factory, connHolder);
if (transactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
return conn;
}
}
-
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisConnectionFactory#getConnection()
*/
public RedisConnection getConnection() {
if (isRedisClusterAware()) {
return getClusterConnection();
}
//获取redis 连接
Jedis jedis = fetchJedisConnector();
JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, getDatabase(), getClientName())
: new JedisConnection(jedis, null, getDatabase(), getClientName()));
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return postProcessConnection(connection);
}
protected Jedis fetchJedisConnector() {
try {
if (getUsePool() && pool != null) {
return pool.getResource();
}
Jedis jedis = createJedis();
//强制初始化,这里会获取连接
// force initialization (see Jedis issue #82)
jedis.connect();
potentiallySetClientName(jedis);
return jedis;
} catch (Exception ex) {
throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
}
}
}
-
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
public void connect() {
client.connect();
}
public class BinaryClient extends Connection {
@Override
public void connect() {
if (!isConnected()) {
super.connect();
if (user != null) {
auth(user, password);
getStatusCodeReply();
} else if (password != null) {
auth(password);
getStatusCodeReply();
}
if (db > 0) {
select(db);
getStatusCodeReply();
}
}
}
}
}
-
public class Connection implements Closeable {
public String getStatusCodeReply() {
flush();
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
}
protected Object readProtocolWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read from a broken connection");
}
try {
return Protocol.read(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}
}
-
public final class Protocol {
public static Object read(final RedisInputStream is) {
return process(is);
}
private static Object process(final RedisInputStream is) {
final byte b = is.readByte();
switch (b) {
case PLUS_BYTE:
return processStatusCodeReply(is);
case DOLLAR_BYTE:
return processBulkReply(is);
case ASTERISK_BYTE:
return processMultiBulkReply(is);
case COLON_BYTE:
return processInteger(is);
case MINUS_BYTE:
processError(is);
return null;
default:
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
}
}
public class RedisInputStream extends FilterInputStream {
public byte readByte() throws JedisConnectionException {
ensureFill();
return buf[count++];
}
private void ensureFill() throws JedisConnectionException {
if (count >= limit) {
try {
limit = in.read(buf);
count = 0;
if (limit == -1) {
throw new JedisConnectionException("Unexpected end of stream.");
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
}
}