1907 字
10 分钟
基于HIVE的疫情防控数据分析与可视化

使用的平台与技术:Hadoop-2.6.5、Jdk-1.8、Sqoop-1.4.7、Hive-1.2.1、MySql-5.6.24、

Idea2020、Maven-3.6.3

Pycharm2024、Django-3.0.3、Python-3.7.16、PyMysql-1.0.2

数据链接:covid_19_clean_complete

项目仓库:https://gitee.com/rongwu651/xg_hive 

博客梳理#

  1. 技术选型:使用 Hadoop、Jdk、Sqoop、Hive、MySql、Idea、Maven、Pycharm、Django、Python、PyMysql 等技术搭建项目。
  2. 数据处理流程
    • 创建 Hive 数据表:启动集群和 Hive 客户端后,创建存储疫情数据的表结构。
    • 数据上传至集群:通过hdfs dfs -put命令将本地数据上传到 HDFS。
    • 构建 Maven 项目:在 Idea 中创建 Maven 项目,添加 Hadoop 依赖,编写 Mapper、Reducer 和 Main 文件进行数据处理和计算。
    • 生成 Jar 包并执行任务:打包 Maven 项目生成 Jar 包,在集群上执行任务,处理后的数据可查看并导入 Hive 表。
    • 数据导入 MySql 表:利用 Sqoop 将数据导入 MySql 表(通过Django模型创建的表)。
  3. Django 项目搭建与配置:创建 Django 项目和应用,定义数据模型,生成并应用数据库迁移文件。
  4. 前后端数据交互与可视化:编写 view.py 和 urls.py 进行请求响应和映射,通过 html 文件和 jinja2 语法实现前后端数据交互,使用 echarts 进行数据可视化展示多种疫情数据图表。

创建Hive数据表#

1. 启动集群和Hive客户端

[root@hadoop100] start-dfs.sh[root@hadoop100] start-yarn.sh[root@hadoop100] hive

2. 创建表结构,行格式,分割字段以Tab为结尾

hive > create table xg(date_day string,siwanglv double,kangfulv double,huoyuebingli int,quezhen int,siwang int,huifu int,shenfeng string)row format delimited fields terminated by '\t';

数据上传至集群#

[root@hadoop100] hdfs dfs -put /local_path /hdfs_path

构建Maven项目#

1.  打开Idea-创建项目-Maven项目-选择maven-archetype-quickstart

2. pom.xml文件中添加Hadoop对应版本的依赖并刷新加载

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.5</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.5</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version></dependency>

3. 创建Mapper文件

public class XinguanMapper extends Mapper<LongWritable, Text,Text, NumberInfo>{ //读取写出类型
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
//按行读取数据,并进行分割存储
String line = value.toString();
Iterable split = Arrays.asList(line.split(","));
ArrayList datas = new ArrayList<>(10);
//存储
for (String s : split){
datas.add(s);
}
if(datas.size()>9){ //防止字符越界
//过滤,选取所需要的数据
if ("China".equals(datas.get(1))||"Taiwan*".equals(datas.get(1))){
String Province = "China".equals(datas.get(1))?datas.get(0) : "Taiwan";
String date = datas.get(4);//日期
Long Confirmed =Long.parseLong(datas.get(5)) ; //已确诊,累计
Long Deaths = Long.parseLong(datas.get(6)); //死亡,累计
Long Health = Long.parseLong(datas.get(7)); //恢复,累计
//写入
context.write(new Text(date),new NumberInfo(Confirmed,Deaths,Health,Province));
}
}
}
}

3.1 创建对象,便于存储Mapper阶段筛选后的多值,至上下文

实现Writable接口,使该类的对象能够在Hadoop MapReduce作业中作为键或值使用

Writable是Hadoop中用于序列化和反序列化数据的标准接口。

public class NumberInfo implements Writable {
private Long Confirmed; //已确诊,累计
private Long Deaths; //死亡,累计
private Long Health; //恢复,累计
private String Province; //省份
public NumberInfo() {}
public NumberInfo(Long confirmed, Long deaths, Long health, String province) {//有参构造
Confirmed = confirmed;
Deaths = deaths;
Health = health;
Province = province;
}
@Override
public void write(DataOutput dataOutput) throws IOException { //数据写出
dataOutput.writeLong(Confirmed);
dataOutput.writeLong(Deaths);
dataOutput.writeLong(Health);
dataOutput.writeUTF(Province);
}
@Override
public void readFields(DataInput dataInput) throws IOException {//数据读取
Confirmed = dataInput.readLong();
Deaths = dataInput.readLong();
Health = dataInput.readLong();
Province = dataInput.readUTF();
}
@Override
public String toString() {
return Confirmed + "\t" + Deaths + "\t" + Health + "\t" + Province;
}
}

4. 设置Reduce文件

public class XinguanReduce extends Reducer<Text, NumberInfo,Text, ResultXinguan> {
protected void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException {
if (values!=null){
for (NumberInfo value :values){
ResultXinguan res = new ResultXinguan(value);
context.write(key,res);
}
}
}
}

4.1最终对象

public class ResultXinguan implements Writable {
private NumberInfo numberInfo;
private double Mortality; //死亡率(死亡/确诊)
private double RecoveryRate;//康复率(康复/确诊)
private Long ActiveCases; //活跃病例(确诊-死亡-康复)
private Long Confirmed; //已确诊,累计
private Long Deaths; //死亡,累计
private Long Health; //恢复,累计
private String Province; //省份
public ResultXinguan() {
}
public ResultXinguan(NumberInfo numberInfo) { //数据处理
this.numberInfo = numberInfo;
Confirmed = this.numberInfo.getConfirmed();
Deaths = this.numberInfo.getDeaths();
Health = this.numberInfo.getHealth();
Province = this.numberInfo.getProvince();
ActiveCases = Confirmed-Deaths-Health;
if (Confirmed != 0) {
Mortality = (double)Deaths / Confirmed;
RecoveryRate = (double)Health / Confirmed;
} else {
Mortality = 0.0;
RecoveryRate = 0.0;
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeDouble(Mortality);
dataOutput.writeDouble(RecoveryRate);
dataOutput.writeLong(ActiveCases);
dataOutput.writeLong(Confirmed);
dataOutput.writeLong(Deaths);
dataOutput.writeLong(Health);
dataOutput.writeUTF(Province);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
Mortality = dataInput.readDouble();
RecoveryRate = dataInput.readDouble();
ActiveCases = dataInput.readLong();
Confirmed = dataInput.readLong();
Deaths = dataInput.readLong();
Health = dataInput.readLong();
Province = dataInput.readUTF();
}
@Override
public String toString() {
return Mortality + "\t" + RecoveryRate + "\t" + ActiveCases + "\t" + Confirmed + "\t" + Deaths + "\t" + Health + "\t" + Province;
}
}

5. 创建Main运行文件

public class XinguanMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//判断输入是否正确
if (strings.length != 2){
System.out.println("Usage: XinguanMain input output");
return -1;
}
//获取用户读取数据、保存数据的路径
String inputPath = strings[0];
String outputPath = strings[1];
//创建实例,设置任务名称和类
Job job = Job.getInstance(super.getConf(),"mapreduce_calculation");
job.setJarByClass(XinguanMain.class);
//设置输入输出类型和路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop100:9000"+inputPath));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop100:9000"+outputPath)); // 采用默认方式(分区,排序,规约,分组)
//设置Map阶段的类、输入输出
job.setMapperClass(XinguanMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NumberInfo.class);
//设置Reduce阶段的类、输入输出
job.setReducerClass(XinguanReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ResultXinguan.class);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configured = new Configuration();
ToolRunner.run(configured,new XinguanMain(),args);
}
}

生成Jar包并执行任务#

1.  打包之后生成的文件在该项目的target文件夹下

2. 执行任务

  1. Main文件中,读取的地址设置的是HDFS路径
  2. 执行任务需要输入完整包名
  3. 输出路径必须不存在
[root@hadoop100 /] mv Hadoop-1.0-SNAPSHOT.jar xg.jar [root@hadoop100 /] hadoop jar xg.jar org.example.xinguan.XinguanMain /hdfs_inputPath /hdfs_outputPath

3. 查看数据结果

[root@hadoop100 /]hdfs dfs -get /hdfs_path/part-r-00000 txt[root@hadoop100 /]head txt2020-01-22 0.0 0.0 0 0 0 0 Gansu2020-01-22 0.0 0.0 26 26 0 0 Guangdong2020-01-22 0.0 0.0 2 2 0 0 Guangxi2020-01-22 0.0 0.0 1 1 0 0 Guizhou2020-01-22 0.0 0.0 4 4 0 0 Hainan2020-01-22 0.0 0.0 1 1 0 0 Hebei2020-01-22 0.0 0.0 0 0 0 0 Heilongjiang2020-01-22 0.0 0.0 5 5 0 0 Henan2020-01-22 0.0 0.0 0 0 0 0 Hong Kong2020-01-22 0.038288288288288286 0.06306306306306306 399 444 17 28 Hubei[root@hadoop100 /]wc -l txt6392 txt

数据导入Hive表中#

hive >load data inpath '/hdfs_path/part-r-00000' into table xg;

创建、配置Django项目和应用#

Django搭建与配置 不会搭建配置(mysql、app)可参考该pdf文件

创建数据模型#

python manage.py makemigrations 生成数据库迁移文件
python manage.py migrate 将生成的迁移文件应用到数据库,实际修改数据库结构。

1.  创建数据模型

class xgdata(models.Model): date_day = models.CharField(max_length=20) siwanglv = models.FloatField() kangfulv = models.FloatField() huoyuebingli = models.IntegerField() quezhen = models.IntegerField() siwang = models.IntegerField() huifu = models.IntegerField() pro = models.CharField(max_length=100) pro_cn =models.CharField(max_length=100)

2. 生成数据库迁移文件,修改数据库结构

manage.py@Hadoop > makemigrations appnamemanage.py@Hadoop > migrate appname

生成的迁移文件会保存在该app的migrations文件夹下

数据导入MySql表中#

[root@hadoop100] sqoop export \--connect jdbc:mysql://hadoop100/xg_xgdata \--username root \--password 123456 \--table xg \--target-dir /hdfs-path \--fields-terminated-by '\t'

编写请求响应view.py和urls.py映射#

视图层
一个视图函数,简称视图,是一个简单的 Python 函数,它接受 Web 请求并且返回 Web 响应。响应可以是一个 HTML 页面、一个 404 错误页面、重定向页面、XML 文档、或者一张图片…

# view.pydef get_xgdata(request): # 原始数据集,访问show页面返回数据集全部内容 xgdata_ODS = xgdata.objects.all() context = {'xgdata':xgdata_ODS} return render(resquest,'show.html',context)
# urls.pyurlpatterns = [ path('',xg.views.index), path('admin/', admin.site.urls), path('show.html/',xg.views.get_xgdata) #映射]

前后端数据交互#

编写html文件,我们可以使用jinja2语法,通过特征字段获取视图层编写的函数所传递的数据内容

<body> <div> {% for i in xgdata %} {{ i.date_day }} {{ i.kangfulv }} <br> {% endfor %} </div></body>

echarts数据可视化#

以折现图为例

<div id="line1" style="width:400px;height:200px"></div><script type="module">// 初始化 echarts 对象 var line1 = echarts.init(document.getElementById('line1')); var series_data = []; var xAxis_data = []; {% for i in xgdata_sum %} xAxis_data.push(new Date("{{ i.date_day }}").getTime()); series_data.push({{i.siwanglv}}*100) {% endfor %}// 构造 series.data 所需的数据格式 const data = xAxis_data.map((date, index) => [date, series_data[index]]);// ECharts 配置 const option = { title: { text: '全国累计死亡率时间变动图', left: 'center' }, tooltip: { //悬浮显示 trigger: 'axis', axisPointer: { type: 'cross' //十字架 }, formatter: function (params) { var date = echarts.format.formatTime('yyyy-MM-dd', params[0].value[0]); return `${date}<br/>死亡率: ${(params[0].value[1]).toFixed(2)}%`; } }, xAxis: { type: 'time', axisLabel: { formatter: function (value) { return echarts.format.formatTime('yyyy-MM-dd', value); } } }, yAxis: { type: 'value' }, series: [{ data: data, type: 'line', smooth:true, symbol:'none', }], markLine: { // 标记线,可用于显示平均值或其他参考线 data: [{ type: 'average', name: '平均值' }] }};// 使用 ECharts 实例设置配置line1.setOption(option);</script>

基于HIVE的疫情防控数据分析与可视化
https://minthana.github.io/blog/posts/基于hive的疫情防控数据分析与可视化/
作者
Mint
发布于
2025-02-18
许可协议
CC BY-NC-SA 4.0