第一步
配置文件redis.conf
cd /usr/apps/redis
vim redis.conf
先输入:set nu 打开行号标识
69行:bind 127.0.0.1
加上注释,取消IP绑定,否则其他主机不能连接
88行:protected-mode no
将yes修改为no,关闭保护模式,否则其他主机不能连接
136行:daemonize yes
将no改为yes,允许后台启动,否则只能前台启动,该进程是阻塞进程,即打开之后不能关闭窗口,若想做其他操作,只能重新开启窗口操作。
500行: requirepass
设置密码,删除注释打开该选项,后面输入密码【可做可不做】,下方提到的均为该配置
第二步
启动redis
redis的启动文件默认安装在了/usr/local/bin
/中
开启redis服务
redis-server /usr/apps/redis/redis.conf
打开redis服务端
redis-cli
效果图
第三步
使用Flink进行连接
package Flink写入Redis.Flink写入redis
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigBase, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//读取数据
val input="C:\\Users\\Lkc\\IdeaProjects\\SparkTest\\src\\main\\resources\\sensor.txt"
val inputStream=env.readTextFile(input)
//先转换成样例类类型
val dataStream=inputStream
.map(data=>{
val arr=data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
//定义一个FlinkJedisCOnfigBase
val conf=new FlinkJedisPoolConfig.Builder()
.setHost("192.168.38.144")
.setPort(6379)
.setPassword("123456")
.build()
dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper()))
env.execute("redis sink test")
}
}
// 定义一个RedisMapper
class MyRedisMapper extends RedisMapper[SensorReading]{
// 定义保存数据写入redis的命令,HSET 表名 key value
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
}
// 将温度值指定为value
override def getValueFromData(data: SensorReading): String = data.temperature.toString
// 将id指定为key
override def getKeyFromData(data: SensorReading): String = data.id
}
第四步
在redis客户端中进行查询
注意:如果遇到此情况,说明redis设置密码了
输入:auth 密码
即可
例如
成功进入之后进行查询数据
源数据
在redis客户端中查询数据:
为何数据量不够呢?
因为相同的id,值会进行覆盖,所以value值是最后一次出现的数值。就像HashMap一样,遇到相同的值会进行覆盖。