你的浏览器禁用了JavaScript, 请开启后刷新浏览器获得更好的体验!
首页
热门
推荐
精选
登录
|
注册
用Python实现邮件发送Hive明细数据
立即下载
用AI写一个
该例子支持:好用才打赏哦
现在下载学习
发布时间:2018-03-28
14人
|
浏览:3461次
|
收藏
|
分享
技术:Python2.7+Hive
运行环境:Python2.7,Hadoop2.x,
概述
针对客户特定的数据需求,需要定期同步数据,使用python语言实现一个简单的同步程序。只需要一个配置文件即可达到数据同步的功能。
详细
# 一、需求描述 客户需要每周周一接收特定的活动数据,生成Excel或是CSV文件,并通过邮件发送给指定接收者。需求初步分析得出如下结论: - 1.客户所要的数据并不太复杂,无须通过特殊处理,可以简单的认为就是SQL查询结果输出 - 2.查询结果输出CSV文件,及邮件发送技术相对成熟,通用性强 - 3.Linux系统的Crond服务支持定时任务 # 二、系统环境要求 - Linux CentOS6.x - Hadoop2.x - Python2.7 # 三、Python依赖库 - PyHive - ppytools2 - thrift - thrift-sasl - sasl # 四、工作流程 ![Hive-Email-Job-01.png](http://7xi700.com1.z0.glb.clouddn.com/Hive-Email-Job-01.png-alias "Hive-Email-Job-01.png") # 五、代码实现 > 项目结构图如下: ![Hive-Email-Job-02.png](http://7xi700.com1.z0.glb.clouddn.com/Hive-Email-Job-02.png-alias "Hive-Email-Job-02.png") >关键实现步骤如下: ### 1.全局配置`settings.py` ```python # -*- coding: utf-8 -*- # __author__ = 'elkan1788@gmail.com' from ppytools.cfgreader import ConfReader import logging '''日志输出格式 ''' logging.basicConfig(level=logging.INFO, encoding='UTF-8', format='%(asctime)s [%(levelname)s] {%(name)-10s} - %(message)s') class ProjectConfig(object): """ProjectConfig """ def __init__(self, *conf_paths): self.cr = ConfReader(*conf_paths) def getHiveConf(self): return self.cr.getValues('HiveServer') def getEmailServer(self): return self.cr.getValues('EmailServer') def getJobInfo(self): return self.cr.getValues('JobInfo') def getCSVFolder(self): return self.cr.getValues('CSVFolder')['folder'] def getCSVFile(self): return self.cr.getValues('CSVFile') def getCSVHead(self): return self.cr.getValues('CSVHead') def getHQLScript(self): return self.cr.getValues('HQLScript') def getEmailInfo(self): return self.cr.getValues('EmailInfo') ``` ### 2.核心代码 `main.py` ```python # -*- coding: utf-8 -*- # __author__ = 'elkan1788@gmail.com' from hiveemailjob.settings import ProjectConfig from ppytools.csvhelper import write from ppytools.emailclient import EmailClient from ppytools.hiveclient import HiveClient from ppytools.lang.timerhelper import timeMeter import datetime import logging import sys import time logger = logging.getLogger(__name__) def build_rk(ts): """ Build HBase row key value :param ts: date time :return: row key """ return hex(int(time.mktime(ts.timetuple())*1000))[2:] def email_att(folder, name): return '{}/{}_{}.csv'.format(folder, name, datetime.datetime.now().strftime('%Y%m%d%H%M%S')) @timeMeter() def run(args): """ Email Job program execute entrance :param args: 1. job file file path 2. start time, format: 2018-01-30 17:09:38 (not require) 3. stop time (not require) :return: Empty """ ''' Read system args start ''' args_len = len(args) if args_len is not 2 and args_len is not 4: logger.error('Enter args is error. Please check!!!') logger.error('1: job file path.') logger.error('2: start time, format: 2018-01-30 17:09:38(option)') logger.error('3: stop time(option)') sys.exit(1) elif args == 4: try: start_time = datetime.datetime.strptime(args[2], '%Y-%m-%d %H:%M:%S') stop_time = datetime.datetime.strptime(args[3], '%Y-%m-%d %H:%M:%S') except Exception, e: raise RuntimeError('Parse start or stop time failed!!!\n', e) else: stop_time = datetime.date.today() start_time = stop_time - datetime.timedelta(days=1) job_file = args[1] start_rk = build_rk(start_time) stop_rk = build_rk(stop_time) '''System settings files (hard code) ''' hive_conf = '/etc/pythoncfg/hive.ini' email_conf = '/etc/pythoncfg/email.ini' sets = ProjectConfig(hive_conf, email_conf, job_file) job_info = sets.getJobInfo() csv_folder = sets.getCSVFolder() logger.info('Now running %s Email Job...', job_info['title']) logger.info('Start time: %s', start_time) logger.info('Stop time: %s', stop_time) hc = HiveClient(**sets.getHiveConf()) csv_file = sets.getCSVFile().items() csv_file.sort() file_list = [] logger.info('File name list: ') for (k, v) in csv_file: logging.info('%s: %s', k, v) file_list.append(v) csv_head = sets.getCSVHead().items() csv_head.sort() head_list = [] logger.info('CSV file head list: ') for (k, v) in csv_head: logging.info('%s: %s', k, v) head_list.append(v) hql_scripts = sets.getHQLScript().items() hql_scripts.sort() email_atts = [] index = 0 for (k, hql) in hql_scripts: logging.info('%s: %s', k, hql) '''Please instance of your logic in here. ''' result, size = hc.execQuery(hql.format(start_rk, stop_rk)) if size is 0: logging.info('The above HQL script not found any data!!!') else: csv_file = email_att(csv_folder, file_list[index]) email_atts.append(csv_file) write(csv_file, head_list[index].split(','), result) index += 1 '''Flush Hive Server connected. ''' hc.closeConn() email_sub = sets.getEmailInfo()['subject'] % start_time email_body = sets.getEmailInfo()['body'] email_to = sets.getEmailInfo()['to'].split(';') email_cc = sets.getEmailInfo()['cc'].split(';') if len(email_atts) == 0: email_body = '抱歉当前未找到任何数据。\n\n' + email_body ec = EmailClient(**sets.getEmailServer()) ec.send(email_to, email_cc, email_sub, email_body, email_atts, False) ec.quit() logger.info('Finished %s Email Job.', job_info['title']) ``` ### 3.系统配置文件`hive.ini`与`email.ini` ```ini # /etc/pythoncfg/hive.ini [HiveServer] host=127.0.0.1 port=10000 user=hive db=default # /etc/pythoncfg/email.ini [EmailServer] server=mail.163.com port=25 user=elkan1788@gmail.com passwd=xxxxxx mode=TSL ``` **注意:** 需要在指定的目录`/etc/pythoncfg/`下配置上述两个文件。 ### 4.邮件Job配置参考`emailjob.ini` ```ini [JobInfo] title=邮件报表任务测试 [CSVFolder] folder=/opt/csv_files/ # Please notice that CSVFile,CSVHead,HQLScript must be same length. # And suggest that use prefix+number to flag and write. [CSVFile] file1=省份分组统计 file2=城市分组统计 [CSVHead] head1=省份,累计 head2=省份,城市,累计 [HQLScript] script1=select cn_state,count(1) m from ext_act_ja1 script2=select cn_state,cn_city,count(1) m from ext_act_ja2 [EmailInfo] to=elkan1788@gmail.com; cc=2292706174@qq.com; # %s it will replace as the start date. subject=%s区域抽奖统计[测试] body=此邮件由系统自动发送,请勿回复,谢谢! ``` **注意:** CSVFile,CSVHead,HQLScript的数量要保持一致,也包括顺序,建议使用前缀+数字的格式命名。 ### 5.Bin文件`hive-emailjob.py` ```python #! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = 'elkan1788@gmail.com' from hiveemailjob import main import sys if __name__ == '__main__': main.run(sys.argv) ``` ### 6.执行效果 在系统终端中敲入`python -u bin/hive_email_job.py`即可,输出如下: ``` 2018-02-20 16:28:21,561 [INFO] {__main__ } - Now running 邮件报表任务测试 Email Job... 2018-02-20 16:28:21,561 [INFO] {__main__ } - Start time: 2018-02-22 2018-02-20 16:28:21,562 [INFO] {__main__ } - Stop time: 2018-02-20 2018-02-20 16:28:21,691 [INFO] {pyhive.hive} - USE `default` 2018-02-20 16:28:21,731 [INFO] {ppytools.hive_client} - Hive server connect is ready. Transport open: True 2018-02-20 16:28:31,957 [INFO] {ppytools.email_client} - Email SMTP server connect ready. 2018-02-20 16:28:31,957 [INFO] {root } - File name list: 2018-02-20 16:28:31,957 [INFO] {root } - file1: 省份分组统计 2018-02-20 16:28:31,957 [INFO] {root } - file2: 城市分组统计 2018-02-20 16:28:31,957 [INFO] {root } - CSV file head list: 2018-02-20 16:28:31,957 [INFO] {root } - head1: 省份,累计 2018-02-20 16:28:31,957 [INFO] {root } - head2: 省份,城市,累计 2018-02-20 16:28:31,957 [INFO] {root } - script1: select cn_state,count(1) m from ext_act_ja2 2018-02-20 16:28:31,958 [INFO] {pyhive.hive} - select cn_state,count(1) m from ext_act_ja2 2018-02-20 16:29:04,258 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 31 2018-02-20 16:29:04,259 [INFO] {ppytools.lang.timer_helper} - Execute
method cost 32.3012499809 seconds. 2018-02-20 16:29:04,261 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/省份分组统计_20180223162904.csv 2018-02-20 16:29:04,262 [INFO] {ppytools.lang.timer_helper} - Execute
method cost 0.00222992897034 seconds. 2018-02-20 16:29:04,262 [INFO] {root } - script2: select cn_state,cn_city,count(1) m from ext_act_ja2 2018-02-20 16:29:04,262 [INFO] {pyhive.hive} - select cn_state,cn_city,count(1) m from ext_act_ja2 2018-02-20 16:29:23,462 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 367 2018-02-20 16:29:23,463 [INFO] {ppytools.lang.timer_helper} - Execute
method cost 19.2005498409 seconds. 2018-02-20 16:29:23,465 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/城市分组统计_20180223162923.csv 2018-02-20 16:29:23,465 [INFO] {ppytools.lang.timer_helper} - Execute
method cost 0.00227284431458 seconds. 2018-02-20 16:29:23,669 [INFO] {ppytools.email_client} - Send email[2018-02-22区域抽奖统计[测试]] success. To users: elkan1788@163.com. 2018-02-20 16:29:23,669 [INFO] {ppytools.lang.timer_helper} - Execute
method cost 0.204078912735 seconds. 2018-02-20 16:29:23,714 [INFO] {__main__ } - Finished 邮件报表任务测试 Email Job. 2018-02-20 16:29:23,715 [INFO] {ppytools.lang.timer_helper} - Execute
method cost 62.1566159725 seconds. ``` OK,一个可以通用的数据文件同步程序至此就完成啦。
本实例支付的费用只是购买源码的费用,如有疑问欢迎在文末留言交流,如需作者在线代码指导、定制等,在作者开启付费服务后,可以点击“购买服务”进行实时联系,请知悉,谢谢
感谢
1
手机上随时阅读、收藏该文章 ?请扫下方二维码
相似例子推荐
评论
作者
凡梦星尘
2
例子数量
19
帮助
1
感谢
评分详细
可运行:
4.5
分
代码质量:
4.5
分
文章描述详细:
4.5
分
代码注释:
4.5
分
综合:
4.5
分
作者例子
Apache Nifi在Windows环境下搭建伪群集及证书登录
用Python实现邮件发送Hive明细数据