11.4 Spark与Redis整合原理与实战
本节通过生产环境实战案例讲解Spark与Redis整合原理及Spark与Redis整合实战。
11.4.1 Spark与Redis整合原理
Redis是一个开源项目(BSD许可)。Redis以内存数据结构存储,用作数据库、缓存和消息代理。它支持数据结构,如字符串、散列、列表、集合、具有范围查询的排序集、位图、超文本和具有半径查询的地理空间索引等。Redis内置复制、Lua脚本、LRU eviction、事务和不同级别的磁盘持久性,通过Redis Sentinel提供高可用性,并通过Redis Cluster进行自动分区。
Redis可以对这些类型运行原子操作,如附加到字符串、在哈希中增加值、将元素推送到列表中、计算集交集、联合与差异于一体;或者在排序集中获得最高排名的成员。
为了实现其卓越的性能,Redis使用内存中的数据集。根据业务用例,可以通过将数据集一次性转储到磁盘中,或通过将每个命令附加到日志来持久化。如果只需要功能丰富的网络内存缓存,则可以选择禁用持久性。
Redis还支持简单的主从异步复制,第一次同步非阻塞的速度非常快,网络切分传输时可自动重连同步。
Redis的其他功能包括:
事务Transactions。
发布/订阅。
Lua脚本。
Keys with a limited time-to-live。
LRU eviction of keys。
自动故障切换。
大多数编程语言可以使用Redis。
Redis以ANSI C编写,适用于大多数POSIX系统,如Linux、* BSD、OS X,无需外部依赖。Linux和OS X是Redis开发和测试的两个操作系统,建议使用Linux进行部署。Redis可能在诸如SmartOS的Solaris衍生系统中工作,但支持是尽力而为的。没有官方支持Windows版本,但是Microsoft开发并维护了Redis的Win-64端口。
11.4.2 Spark与Redis整合实战
本节以生产环境中Spark与Redis整合实战案例来讲解。在通信运营商的Spark大数据项目中,Spark每分钟实时读取Hdfs中的话单数据,经过业务逻辑代码分析转换后,将清洗以后的数据转换成一个List字符串列表,然后遍历List字符串列表,将每条记录拼接成一个Key-Value字符串放入Redis队列。
Spark与Redis整合实战案例实现步骤如下。
(1)通过Maven方式下载Redis的jedis 2.6.0 Jar包。
pom.xml文件增加以下内容:
1. <dependency> 2. <groupId>redis.clients</groupId> 3. <artifactId>jedis</artifactId> 4. <version>2.6.0</version> 5. </dependency>
在Spark中导入Redis的Jar包。
1. import redis.clients.jedis.Jedis; 2. import redis.clients.jedis.JedisPool; 3. import redis.clients.jedis.JedisPoolConfig;
(2)在项目config.properties配置文件中增加Redis的主机地址、端口、密码,以及Redis连接池分配的连接数、等待时间等信息。
1. ## REDIS 2. redis.ip=100.*.*.100 3. redis.port=6379 4. redis.password=password 5. ...... 6. ## redis 7. #最大分配的对象数 8. redis.pool.maxTotal=1024 9. #最大能够保持idle状态的对象数 10. redis.pool.maxIdle=200 11. #当池内没有返回对象时,最大等待时间 12. redis.pool.maxWait=1000 13. #当调用borrow Object方法时,是否进行有效性检查 14. redis.pool.testOnBorrow=true 15. #当调用return Object方法时,是否进行有效性检查 16. redis.pool.testOnReturn=true 17.
(3)编写RedisServiceImpl实现类,从配置文件中获取Redis的主机地址、端口、密码等信息;编写getFromPool方法从redis访问池中获取redis实例;编写getSingle方法获取redis实例。
1. public class RedisServiceImpl { 2. private static final Map<String, String> REDIS_CONFIG = Config. getInstance().getRedisParams(); 3. private static final String REDIS_IP = REDIS_CONFIG.get("redis.ip"); 4. private static final String REDIS_PORT = REDIS_CONFIG.get("redis.port"); 5. private static final String REDIS_PASSWORD = REDIS_CONFIG.get("redis. password"); 6. private static final int REDIS_TIMEOUT = 2000; 7. private static Logger log = LoggerFactory.getLogger(RedisServiceImpl.class); 8. private static JedisPool pool; 9. /** * 从redis访问池中获取redis实例 * * @return redis实例 10. */ 11. public static Jedis getFromPool() { 12. if (pool == null) { 13. JedisPoolConfig config = new JedisPoolConfig(); 14. config.setMaxTotal(Integer.valueOf(REDIS_CONFIG.get("redis. pool.maxTotal"))); 15. config.setMaxIdle(Integer.valueOf(REDIS_CONFIG.get("redis. pool.maxIdle"))); 16. config.setMaxWaitMillis(Integer.valueOf(REDIS_CONFIG.get("redis. pool.maxWait"))); 17. config.setTestOnBorrow(Boolean.valueOf(REDIS_CONFIG.get("redis. pool.testOnBorrow"))); 18. config.setTestOnReturn(Boolean.valueOf(REDIS_CONFIG.get("redis. pool.testOnReturn"))); 19. pool = new JedisPool(config, REDIS_IP, Integer.valueOf(REDIS_ PORT), REDIS_TIMEOUT, REDIS_PASSWORD); 20. } 21. return pool.getResource(); 22. } 23. 24. public static void returnResource(Jedis redis) { 25. pool.returnResource(redis); 26. } 27. 28. /** * 获取redis实例 * * @return redis实例 29. */ 30. 31. public static Jedis getSingle() { 32. Jedis redis = new Jedis(REDIS_IP, Integer.valueOf(REDIS_PORT), REDIS_TIMEOUT); 33. redis.auth(REDIS_PASSWORD); 34. return redis; 35. } 36. }
(4)编写项目的业务代码,RedisBean类数据结构用于要存放Redis的数据。
1. public class RedisBean { 2. public void setKey(String key) { 3. this.key = key; 4. } 5. 6. public void setValue(String value) { 7. this.value = value; 8. } 9. 10. public String getKey() { 11. return key; 12. } 13. 14. public String getValue() { 15. return value; 16. } 17. 18. protected String key; 19. protected String value; 20. 21. }
根据项目的业务需求,将Spark中提取转换后的每条记录存入业务数据结构RedisBean的key、value中;然后加入到RedisBeanList列表中。RedisBeanList是List<RedisBean> RedisBeanList类型。
1. if ("RedisTestQUALINFO".equals(keyType)) { 2. RedisBean.key = "RedisTestQUALINFO"; 3. RedisBean.value = RedisTestReslut.toString(); 4. RedisBeanList.add(RedisBean); 5. }
(5)最终调用业务方法addToRedis,通过RedisServiceImpl.getSingle()获取Redis实例,然后循环遍历List<RedisBean>的每个元素,调用redis.lpush方法分别将Key值、Value值lpush到Redis中。lPush完成以后,通过redis.close关闭连接。
1. public static void addToRedis(List<RedisBean> redisBeanList) { 2. Jedis redis = RedisServiceImpl.getSingle(); 3. for (RedisBean redisData : redisBeanList) { 4. redis.lpush(redisData.getKey(), redisData.getValue()); 5. } 6. redis.close(); 7. }
(6)Redis业务验证:可以登录到Redis系统中,查询数据已持久化至Redis。
1. redis-cli –h 100.*.*.100 -p 6379 -a 'password' 2. select 0 ---切换到0库 3. keys * ---列出所有的key 4. lrange CDNNODEQUALINFO 0 -1 查看所有的记录