基于Clickhouse建日志中心

一、项目背景介绍

经过十三年的发展,快递100目前C端累计注册用户超2.5亿、P端(专业用户)累计注册快递员及网点经营者超130万、B端累计服务电商企业/泛电商企业/品牌企业/政府与公共组织等客户超250万家;每天快递查询调用量超4亿次、寄件下单量超30万单。

公司的业务量和数据量是相对较大且复杂的,因此拥有一个实时性、可扩展性、并拥有强大的搜索与分析功能的日志中心至关重要,它不仅可以记录系统的性能、运行状态,还可以为我们提供很多有价值的业务数据和用户行为分析,这些都将为业务的洞察与决策提供有效支持,从而推动产品的迭代升级和运营策略调整。

同时,日志也是我们线上定位问题的重要手段,我们经常会依赖日志排查解决来自客户的问题,帮助我们提升服务质量,增强客户对公司的产品信赖。

如何构建一套适合我们的日志中心?在业务的发展过程中,日志的架构从原始的文件记录到ELK体系,我们也遇到了一系列的问题,经历实践和研究分析,我们最终构建了新一代的日志系统。下面就给大家分享整个过程。

二、初期架构1. 原始架构

把时间回溯到10年前,我们会怎样去记录日志?

架构比较简单,但问题也比较明显,有几个明显的缺点:

  • 每次查看日志文件都需要登录到不同的机器,非常不方便;
  • 通过 tail 或者 cat 等命令查看日志,如果对日志文件进行检索、聚合等操作,还会对服务器的 io 造成很大的压力,甚至导致故障的产生;
  • 日志文件过大不仅导致查询变得特别慢,还经常带来磁盘告警甚至磁盘空间不足等严重后果;
  • 日志格式不规范,日志随意写到文件,可读性和可分析性几乎为零;
  • 应用多节点挂载 NFS 性能差,容易产生日志丢失,从而影响问题定位和排障。

2.ELK体系

经历了这些问题之后,2017年,我们把目光转向新一代的基于Elasticsearch的日志体系,恰逢之前快递100订单检索引入了Elasticsearch,侧面又了解到基于Elastic Stack的ELK体系,经过一系列研究,开始意识到日志规范的重要性和采集的好处。

ELK是业界最成熟的日志技术栈,使用JSON格式存储,易于解析,再配合全文检索能力,能够快速从众多的日志信息中搜索到关键信息,加上Kibana的易用性,使得日志体验上升了几个档次,架构大致如下,不具体拆分细讲。

我们再回顾一下之前的挑战和缺点,可以看到基本解决了之前遇到的问题:

三、实践中的挑战

使用ELK一段时间之后,我们解决了从无到有的问题。但随着一系列基于日志进行分析和告警的工作逐步开展,新的问题也开始浮现。我们尝试进行优化后,效果并不明显,这促使我们重新考量架构的升级,主要的问题体现在:

  • 成本问题:ES压缩率不高,基于目前的日志量,合规性要求需要保留6个月,需要耗费巨大的存储成本;
  • 吞吐瓶颈:ES分词特性写入吞吐瓶颈问题,容易导致日志写入发生延迟;
  • 资源占用率高:ES在内存使用上的消耗过高;
  • 生命周期维护:ES旧版本TTL问题,需要手工介入数据过期,维护成本高;
  • 分析能力一般:由于更多的分析需求出现,ES的聚合能力受到了挑战。

针对上述的一些问题,在2020年开始了解到Clickhouse的存在,我们对ES和Clickhouse做了一个选型对比。基于对比得出的结论,最终决定选用 Clickhouse 作为下一代日志存储数据库。

四、新的架构体系

ELK体系经过多年的发展,生态已经非常强大,Clickhouse想达到同样的生态,需要更长的时间去发展,因此这个过程也需要投入一些研究或者开发量才能达到更好的效果,幸好 Clickhouse 本身的学习曲线较低,经过短时间的研究,我们制定了新的日志架构。

可以从几个组件的构成来看这个架构图,对比ELK体系与新的体系的不同:

  • 采集层:从Logstash到ilogtail,ilogtail性能更强,资源消耗更低;
  • 处理层:从Logstash到ilogtail,ilogtail还支持数据脱敏,多行拆分等实用功能;
  • 存储层:从Elasticsearch改为Clickhouse,选型过程已有对比,这里不再赘述;
  • 可视化:从Kibana到Clickvisual,这里有优点也有缺点,所以还是配合了Grafana才达到类似的效果。

Clickvisual的优点:灵活的SQL、日志审计、告警策略;

Kibana的优点:Kibana具备一些基础的BI功能,可用于日志分析。

1.新架构的成果

还是回顾一下之前的挑战,问题基本得到了解决:

2.基于Clickhouse的日志存储

基于10亿日志数据进行测试,得出磁盘占用的对比柱状图:

基于10亿数据的测试,在两者集群模式下,消费Kafka的速度对比

新的架构最核心的改变,就是将ES换成了Clickhouse,看中的就是极高的压缩率,最终的结果是同等存储条件下,原来ES只能保留一个月的数据,现在可以做到保留六个月,这其中少不了很多存储细节的优化,其中包含:

  • 大部分字段采用ZSTD压缩模式来提升压缩率;
  • 低基数LowCardinality的使用,节省存储的同时还做到性能提升;
  • 连续性时间字段的Delta+ZSTD压缩;
  • 冷热策略的配置,近一个月保留在SSD盘,一到六个月的数据自动流到HDD盘,六个月前数据自动清理。


建表语句如下:

3.基于Clickvisual的可视化

ClickVisual 是一个轻量级的开源日志查询、分析、报警的可视化平台,致力于提供一站式应用可靠性的可视化的解决方案。既可以独立部署使用,也可作为插件集成到第三方系统。目前是市面上唯一一款支持 ClickHouse 的类 Kibana 的开源业务日志查询平台。

它具备的特性,部分符合我们的需求:

  • 支持可视化的查询面板,可查询命中条数直方图和原始日志;
  • 支持设置日志索引功能,分析不同索引的占比情况;
  • 支持 Proxy Auth 功能,能被非常轻松地集成到第三方系统;
  • 支持基于 ClickHouse 日志的实时报警功能。

还提供了原始的SQL查询功能,直接输入SQL聚合语句,即时简单地对日志进行聚合分析:

体验总体类似Kibana,细节稍有不足,通过这个查询分析界面作为一个入口,搭配日志告警模块,快速定位问题和故障排除方面的能力得到了大大的提升,基本无缝从Kibana上切换过来,拥有不错的排障体验。

五、进一步优化

诚然,做到上述的效果还是不足以满足我们的要求。因而在此基础上,我们进行了优化方面的思考 ,其中也踩了一些Clickhouse的坑,使用了一些Clickhouse的新特性,是一个很有意思的过程。1.日志查询优化探索

得益于Clickhouse的高压缩率和查询性能,小日志量的表直接配合时间分区搜索即可,但是当日志量涨到一定程度的时候,查询缓慢总是一个难受的事,我们对一些场景进行了总结:

  • traceid场景:在 Skywalking 中根据 traceid 查询链路日志时,使用 tokenbf_v1索引,并通过 hasToken 查询,由于跳过大部分无效 parts,可快速命中返回;
  • 无结构化日志:对于这种无结构化日志,用 like 性能会非常慢且消耗 CPU 甚至内存,新版本的 Clickhouse 已经支持了倒排索引,也开始基于倒排— 索引优化,可大大提高响应速度;
  • 聚合场景:一些常规的聚合需求,可通过Clickhouse的Projection功能来满足。

2.本地表还是分布式表

本地表是Clickhouse的存储表,分布式表只是逻辑表,本身并不存储数据,在日志高频写入的场景,还是推荐写本地表,原因有这么几点:

  • 当我们大批量写入日志时,可以直接往分布式表写,但数据会先拆分成不同parts,再通过Zookeeper进行分发,增加了集群间网络的负载,导致写入变慢,甚至出现Too many parts问题
  • 写分布式表更容易出现数据一致性问题
  • Zookeeper压力变大

3.Clickhouse的限制策略

随着日志中心的建设,日志体量越来越大,开始暴露一些配置层面的问题,我们开始对 Clickhouse 增加一些限制,以免错误的 SQL 导致集群缓慢甚至 OOM 的问题,对于日志查询用户,单独做了 SQL 复杂度的限制,users.xml 中有几个参数:

  • max_memory_usage:单个服务器上运行查询的最大内存;
  • max_memory_usage_for_user:单个服务器上运行用户查询的最大内存;
  • max_memory_usage_for_all_queries:单个服务器上运行所有查询的最大内存;
  • max_rows_to_read:运行查询时可从表中读取的最大行数;
  • max_result_rows:限制结果中的行数;
  • max_bytes_to_read:运行查询时可以从表中读取的最大字节数(未压缩数据)。

结语

在这篇文章中,我们分享了快递100在云原生技术方面的实践、思考和应用,特别是在日志中心建设方面的实践。日志中心的上线应用,让我们在问题定位方面的效率得到了极大的提升,系统更加稳定和可靠;同时,通过对日志的收集、分析和挖掘,我们也更好地了解了用户需求和行为,优化了产品设计和运营策略,促进业务高速增长。

from:https://mp.weixin.qq.com/s?__biz=MzkzMjYzNjkzNw==&mid=2247612190&idx=1&sn=046e724f035dedd610ffb43e56ae07ea&source=41#wechat_redirect

(转)Iceberg文件组织

使用以下 SQL 创建名为user_log_iceberg的 Iceberg 表并插入一条数据:

create table hadoop_catalog.iceberg_db.user_log_iceberg (
    imei string,
    uuid string,
    udt timestamp
)
using iceberg
partitioned by (days(udt));

insert into hadoop_catalog.iceberg_db.user_log_iceberg values ('xxxxxxxxxxxxx', 'yyyyyyyyyyyyy', cast(1640966400 as timestamp));

user_log_iceberg 在文件系统中的存储结构为:

user_log_iceberg/
├── data
│   └── udt_day=2021-12-31
│       └── 00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet
└── metadata
    ├── f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro
    ├── snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro
    ├── v1.metadata.json
    ├── v2.metadata.json
    └── version-hint.text

可以看到该表目录下有两个子文件夹:data 和 metadata。其中 data 就是真正的数据文件目录,metadata 是该表的元数据目录,这里的 metadata 就是替代 Hive 中的 Metastore 服务的。

在了解 Iceberg 元数据管理之前先看几个概念:

  1. SnapshotSnapshot 就是表在某个时间点的状态,其中包括该时间点所有的数据文件。Iceberg 对表的每次更改都会新增一个 Snapshot。
  2. Metadata File每新增一个 Snapshot 就会新增一个 Metadata 文件,该文件记录了表的存储位置、Schema 演化信息、分区演化信息以及所有的 Snapshot 以及所有的 Manifest List 信息。
  3. Manifest ListManifest List 是一个元数据文件,其中记录了所有组成快照的 Manifest 文件信息。
  4. Manifest FileManifest File 是记录 Iceberg 表快照的众多元数据文件的其中一个。其中的每一行都记录了一个数据文件的分区,列级统计等信息。 一个 Manifest List 文件中可以包含多个 Manifest File 的信息。
  5. Partition Spec表示字段值和分区值之间的逻辑关系。
  6. Data File包含表中所有行的文件。
  7. Delete File对按位置或数据值删除的表行进行编码的文件。
iceberg metadata

从上图中可以看出 Iceberg 表通过三级关系管理表数据,下面以 Spark 中的 spark.sql.catalog.hadoop_prod.type=hadoop 为例说明。:

最上层中记录了 Iceberg 表当前元数据的版本,对应的是version-hint.text文件,version-hint.text文件中只记录了一个数字表示当前的元数据版本,初始为 1,后续表每变更一次就加 1。

中间层是元数据层。其中 Metadata File 记录了表的存储位置、Schema 演化信息、分区演化信息以及所有的 Snapshot 和 Manifest List 信息,对应的是v1.metadata.jsonv2.metadata.json文件,其中v后面的数字和version-hint.text文件中的数字对应,每当新增一个 Snapshot 的时候,version-hint.text中的数字加 1,同时也会新增一个vx.metadata.json文件,比如执行insert into hadoop_catalog.iceberg_db.user_log_iceberg values ('xxxxxxxxxxxxx', 'yyyyyyyyyyyyy', cast(1640986400 as timestamp))delete from hadoop_catalog.iceberg_db.user_log_iceberg where udt = cast(1640986400 as timestamp)之后,版本就会变成v4:

user_log_iceberg/
├── data
│   └── udt_day=2021-12-31
│       ├── 00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet
│       └── 00000-0-88d582ef-605e-4e51-ba98-953ee3dd4c02-00001.parquet
└── metadata
    ├── b3b1643b-56a2-471e-a4ec-0f87f1efcd80-m0.avro
    ├── ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro
    ├── f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro
    ├── snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro
    ├── snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro
    ├── snap-8046643380197343006-1-b3b1643b-56a2-471e-a4ec-0f87f1efcd80.avro
    ├── v1.metadata.json
    ├── v2.metadata.json
    ├── v3.metadata.json
    ├── v4.metadata.json
    └── version-hint.text

查看v4.metadata.json中的内容如下:点击查看

{
  "format-version": 1,
  "table-uuid": "c69c9f46-b9d8-40cf-99da-85f55cb7bffc",
  "location": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg",
  "last-updated-ms": 1647772606874,
  "last-column-id": 3,
  "schema": {
    "type": "struct",
    "schema-id": 0,
    "fields": [
      {
        "id": 1,
        "name": "imei",
        "required": false,
        "type": "string"
      },
      {
        "id": 2,
        "name": "uuid",
        "required": false,
        "type": "string"
      },
      {
        "id": 3,
        "name": "udt",
        "required": false,
        "type": "timestamptz"
      }
    ]
  },
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "imei",
          "required": false,
          "type": "string"
        },
        {
          "id": 2,
          "name": "uuid",
          "required": false,
          "type": "string"
        },
        {
          "id": 3,
          "name": "udt",
          "required": false,
          "type": "timestamptz"
        }
      ]
    }
  ],
  "partition-spec": [
    {
      "name": "udt_day",
      "transform": "day",
      "source-id": 3,
      "field-id": 1000
    }
  ],
  "default-spec-id": 0,
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": [
        {
          "name": "udt_day",
          "transform": "day",
          "source-id": 3,
          "field-id": 1000
        }
      ]
    }
  ],
  "last-partition-id": 1000,
  "default-sort-order-id": 0,
  "sort-orders": [
    {
      "order-id": 0,
      "fields": []
    }
  ],
  "properties": {
    "owner": "PowerYang"
  },
  "current-snapshot-id": 4140724156423386600,
  "snapshots": [
    {
      "snapshot-id": 6744647507914919000,
      "timestamp-ms": 1647758232673,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1647757937137",
        "added-data-files": "1",
        "added-records": "1",
        "added-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "1",
        "total-files-size": "1032",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-6744647507914918603-1-f9d66153-6745-4103-ad24-334fc62f0d1e.avro",
      "schema-id": 0
    },
    {
      "snapshot-id": 8046643380197343000,
      "parent-snapshot-id": 6744647507914919000,
      "timestamp-ms": 1647772293740,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1647770527459",
        "added-data-files": "1",
        "added-records": "1",
        "added-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "2",
        "total-files-size": "2064",
        "total-data-files": "2",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-8046643380197343006-1-b3b1643b-56a2-471e-a4ec-0f87f1efcd80.avro",
      "schema-id": 0
    },
    {
      "snapshot-id": 4140724156423386600,
      "parent-snapshot-id": 8046643380197343000,
      "timestamp-ms": 1647772606874,
      "summary": {
        "operation": "delete",
        "spark.app.id": "local-1647770527459",
        "deleted-data-files": "1",
        "deleted-records": "1",
        "removed-files-size": "1032",
        "changed-partition-count": "1",
        "total-records": "1",
        "total-files-size": "1032",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro",
      "schema-id": 0
    }
  ],
  "snapshot-log": [
    {
      "timestamp-ms": 1647758232673,
      "snapshot-id": 6744647507914919000
    },
    {
      "timestamp-ms": 1647772293740,
      "snapshot-id": 8046643380197343000
    },
    {
      "timestamp-ms": 1647772606874,
      "snapshot-id": 4140724156423386600
    }
  ],
  "metadata-log": [
    {
      "timestamp-ms": 1647757946953,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v1.metadata.json"
    },
    {
      "timestamp-ms": 1647758232673,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v2.metadata.json"
    },
    {
      "timestamp-ms": 1647772293740,
      "metadata-file": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/v3.metadata.json"
    }
  ]
}

可以看到snapshots属性中记录了多个 Snapshot 信息,每个 Snapshot 中包含了 snapshot-id、parent-snapshot-id、manifest-list 等信息。通过最外层的 current-snapshot-id 可以定位到当前 Snapshot 以及 manifest-list 文件为/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro。使用java -jar avro-tools-1.11.0.jar tojson snap-4140724156423386841-1-ecb9255a-bcc5-4954-a4e9-3a54f5b09500.avro > manifest_list.json将该文件转换成 json 格式,查看其内容:点击查看

({
  "manifest_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro",
  "manifest_length": 6127,
  "partition_spec_id": 0,
  "added_snapshot_id": {
    "long": 4140724156423386841
  },
  "added_data_files_count": {
    "int": 0
  },
  "existing_data_files_count": {
    "int": 0
  },
  "deleted_data_files_count": {
    "int": 1
  },
  "partitions": {
    "array": [
      {
        "contains_null": false,
        "contains_nan": {
          "boolean": false
        },
        "lower_bound": {
          "bytes": "0J\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "0J\u0000\u0000"
        }
      }
    ]
  },
  "added_rows_count": {
    "long": 0
  },
  "existing_rows_count": {
    "long": 0
  },
  "deleted_rows_count": {
    "long": 1
  }
},
{
  "manifest_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/metadata/f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro",
  "manifest_length": 6128,
  "partition_spec_id": 0,
  "added_snapshot_id": {
    "long": 6744647507914918603
  },
  "added_data_files_count": {
    "int": 1
  },
  "existing_data_files_count": {
    "int": 0
  },
  "deleted_data_files_count": {
    "int": 0
  },
  "partitions": {
    "array": [
      {
        "contains_null": false,
        "contains_nan": {
          "boolean": false
        },
        "lower_bound": {
          "bytes": "0J\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "0J\u0000\u0000"
        }
      }
    ]
  },
  "added_rows_count": {
    "long": 1
  },
  "existing_rows_count": {
    "long": 0
  },
  "deleted_rows_count": {
    "long": 0
  }
})

里面包含了两条 json 数据,分别对应了个 Manifest 文件信息,除了 Manifest 文件的路径之外还有一些统计信息。使用java -jar avro-tools-1.11.0.jar tojson f9d66153-6745-4103-ad24-334fc62f0d1e-m0.avro > manifest_1.jsonjava -jar avro-tools-1.11.0.jar tojson ecb9255a-bcc5-4954-a4e9-3a54f5b09500-m0.avro > manifest_2.json将两个 Manifest 文件转为 json 格式,观察其内容:点击查看

{
  "status": 1,
  "snapshot_id": {
    "long": 6744647507914918603
  },
  "data_file": {
    "file_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/data/udt_day=2021-12-31/00000-0-67ab9286-794b-456d-a1d3-9c797a2b4b03-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "udt_day": {
        "int": 18992
      }
    },
    "record_count": 1,
    "file_size_in_bytes": 1032,
    "block_size_in_bytes": 67108864,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 48
        },
        {
          "key": 2,
          "value": 48
        },
        {
          "key": 3,
          "value": 51
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 1
        },
        {
          "key": 2,
          "value": 1
        },
        {
          "key": 3,
          "value": 1
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        },
        {
          "key": 3,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000@\\CsÔ\u0005\u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000@\\CsÔ\u0005\u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [4]
    },
    "sort_order_id": {
      "int": 0
    }
  }
}
{
  "status": 2,
  "snapshot_id": {
    "long": 4140724156423386841
  },
  "data_file": {
    "file_path": "/opt/module/spark-3.2.1/spark-warehouse/iceberg_db/user_log_iceberg/data/udt_day=2021-12-31/00000-0-88d582ef-605e-4e51-ba98-953ee3dd4c02-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "udt_day": {
        "int": 18992
      }
    },
    "record_count": 1,
    "file_size_in_bytes": 1032,
    "block_size_in_bytes": 67108864,
    "column_sizes": {
      "array": [
        {
          "key": 1,
          "value": 48
        },
        {
          "key": 2,
          "value": 48
        },
        {
          "key": 3,
          "value": 51
        }
      ]
    },
    "value_counts": {
      "array": [
        {
          "key": 1,
          "value": 1
        },
        {
          "key": 2,
          "value": 1
        },
        {
          "key": 3,
          "value": 1
        }
      ]
    },
    "null_value_counts": {
      "array": [
        {
          "key": 1,
          "value": 0
        },
        {
          "key": 2,
          "value": 0
        },
        {
          "key": 3,
          "value": 0
        }
      ]
    },
    "nan_value_counts": {
      "array": []
    },
    "lower_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000\btëwÔ\u0005\u0000"
        }
      ]
    },
    "upper_bounds": {
      "array": [
        {
          "key": 1,
          "value": "xxxxxxxxxxxxx"
        },
        {
          "key": 2,
          "value": "yyyyyyyyyyyyy"
        },
        {
          "key": 3,
          "value": "\u0000\btëwÔ\u0005\u0000"
        }
      ]
    },
    "key_metadata": null,
    "split_offsets": {
      "array": [4]
    },
    "sort_order_id": {
      "int": 0
    }
  }
}

可以看到,每个 Manifest 文件中的每一行都对应一个 data 目录下的数据文件,除了记录数据文件的路径之外,还记录了该数据文件对应的文件格式、分区信息、以及尽可能详细地记录了各个字段的统计信息、排序信息等。Manifest 文件中的 status,表示此次操作的类型,1 表示 add,2 表示 delete。

可以发现 Iceberg 中对数据文件的管理是文件级别,分区管理、字段统计也是到文件级别,而不是目录级别,这也是为什么 Iceberg 扫描要比 Hive 快的原因。在扫描计划里,查询谓词会自动转换为分区数据的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数、空计数、下限和上限来消除无法匹配查询谓词的文件,在某些情况下可以提升数十倍效率。

但是由于 Iceberg 用 json 文件存储 Metadata,表的每次更改都会新增一个 Metadata 文件,以保证操作的原子性。历史 Metadata 文件不会删除,所以像流式作业就需要定期清理 Metadata 文件,因为频繁的提交会导致堆积大量的 Metadata。可以通过配置write.metadata.delete-after-commit.enabledwrite.metadata.previous-versions-max属性实现自动清理元数据。

提示

前面是以 spark.sql.catalog.hadoop_prod.type=hadoop举例,如果 spark.sql.catalog.hadoop_prod.type=hive,文件组织方式会稍有不同,如:

  1. 没有version-hint.text文件,而是通过 Metastore 服务来记录当前版本指针;
  2. Metadata File 的命名不再是vx.metadata.json的方式,.metadata.json前面的vx部分将是一个很长的 UUID。

from:https://www.sqlboy.tech/pages/19baa8/

(转)「大数据」Hive 分区和分桶的区别及示例讲解

一、概述

在大数据处理过程中,Hive是一种非常常用的数据仓库工具。Hive分区和分桶是优化Hive性能的两种方式,它们的区别如下:

1)分区概述

Hive分区是把数据按照某个属性分成不同的数据子集。

  • 在Hive中,数据被存储在HDFS中,每个分区实际上对应HDFS下的一个文件夹,这个文件夹中保存了这个分区的数据。
  • 因此,在Hive中使用分区,实际上是将数据按照某个属性值进行划分,然后将相同属性值的数据存储在同一个文件夹中。Hive分区的效率提升主要是因为,当进行查询操作时,只需读取与查询相关的数据分区,避免了全表扫描,节约了查询时间

Hive分区的主要作用是:

  • 提高查询效率: 使用分区对数据进行访问时,系统只需要读取和此次查询相关的分区,避免了全表扫描,从而显著提高查询效率。
  • 降低存储成本: 分区可以更加方便的删除过期数据,减少不必要的存储。

2)分桶概述

Hive分桶是将数据划分为若干个存储文件,并规定存储文件的数量。

  • Hive分桶的实现原理是将数据按照某个字段值分成若干桶,并将相同字段值的数据放到同一个桶中。在存储数据时,桶内的数据会被写入到对应数量的文件中,最终形成多个文件。
  • Hive分桶主要是为了提高分布式查询的效率。它能够通过将数据划分为若干数据块来将大量数据分发到多个节点,使得数据均衡分布到多个机器上处理。这样分发到不同节点的数据可以在本地进行处理,避免了数据的传输和网络带宽的浪费,同时提高了查询效率。

分桶的主要作用是:

  • 数据聚合: 分桶可以使得数据被分成较小的存储单元,提高了数据统计和聚合的效率。
  • 均衡负载: 数据经过分桶后更容易实现均衡负载,数据可以分发到多个节点中,提高了查询效率。

综上所述,分区和分桶的区别在于其提供的性能优化方向不同。分区适用于对于数据常常进行的聚合查询数据分析,而分桶适用于对于数据的均衡负载、高效聚合等方面的性能优化。当数据量较大、查询效率比较低时,使用分区和分桶可以有效优化性能。分区主要关注数据的分区和存储,而分桶则重点考虑数据的分布以及查询效率。

二、环境准备

如果已经有了环境了,可以忽略,如果想快速部署环境可以参考我这篇文章:通过 docker-compose 快速部署 Hive 详细教程

三、外部表和管理表

在Hive中,可以创建两种类型的表:外部表和管理表。它们之间的主要区别如下:

1)外部表

1、外部表介绍

外部表是指在Hive中创建的表,实际上其数据是存储在外部文件系统(HDFS或本地文件系统)中的。

  • 外部分区表是一种特殊类型的表,它们的数据存储在Hive之外的文件系统上,例如HDFS、S3等。
  • 对于外部分区表,Hive只会管理它们的元数据信息,而不会管理数据文件本身,这意味着,如果你使用Hive命令删除一个外部分区表,只会删除该表的元数据,而不会删除数据文件
  • 外部分区表通常用于存储和管理原始数据,这些数据通常需要在多个系统和工具之间共享。

2、示例讲解

【示例一】下面是创建Hive外部表的一个示例(数据存储在HDFS):

假设我们有一个存储在 HDFS 上的数据文件,其路径为’/user/hive/external_table/data’,我们可以通过以下语句,在Hive中创建一个外部表:

# 登录容器docker exec -it hive-hiveserver2 
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop
# 建表
CREATE EXTERNAL TABLE external_table1 (
    column1 STRING,
    column2 INT,
    column3 DOUBLE)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/user/hive/external_table/data';

在该表中,我们指定了表的各列的数据类型和分隔符等信息,并且使用了LOCATION 关键字来指定数据文件的存储位置。这样,在Hive中对该外部表进行查询操作时,Hive会自动去对应的位置读取数据文件,并据此返回查询结果。

load 数据

# 模拟一些数据cat >data<<EOFc1,12,56.33c2,14,58.99c3,15,66.34c4,16,76.78EOF
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 加载数据,local 是加载本机文件数据load data local inpath './data' into table external_table1;

需要注意的是,在使用外部表时,我们必须保证Hive对数据文件的访问权限与HDFS的文件权限相同,否则会导致外部表的查询失败。此外,在使用外部表时,务必不要删除外部表的数据文件,否则将会导致查询结果的不准确。

【示例一】下面是创建外部表访问本地数据文件的示例(数据存储在本地,很少使用):

在Hive中,我们同样可以创建外部表来访问本地文件系统上的数据文件。在这种情况下,我们需要注意的是,在Hive的配置中,必须开启hive.stats.autogather 功能。否则,在查询外部表时可能会出现错误。

假设我们有一个存储在本地文件系统上的数据文件,路径为’/path/to/local/file’,我们可以通过以下语句,在Hive中创建一个外部表:

CREATE EXTERNAL TABLE external_table2 (
    column1 STRING,
    column2 INT,
    column3 DOUBLE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION 'file:///path/to/local/file';### hive文件存储格式包括以下几类(STORED AS TEXTFILE):#1、TEXTFILE#2、SEQUENCEFILE#3、RCFILE#4、ORCFILE(0.11以后出现)#其中TEXTFILE为默认格式,建表时不指定默认为这个格式,导入数据时会直接把数据文件拷贝到hdfs上不进行处理;

需要注意的是,我们在使用LOCATION关键字时,要指定为’file:///path/to/local/file’,而不是 ‘/path/to/local/file’ ,这是因为我们需要使用文件系统的URL来访问本地文件系统上的数据文件。

2)管理表(内部表)

1、管理表(内部表)介绍

管理表是利用Hive自身的存储能力来对数据进行存储和管理的表。在Hive中创建管理表时,必须指定数据的存储路径。

  • 管理表也称为内部表(Internal Table),管理表是Hive默认创建的表类型,它的数据存储在Hive默认的文件系统上(通常是HDFS)。
  • Hive会自动管理这些表的数据和元数据,包括表的位置、数据格式等。如果你使用Hive命令删除了一个管理表,那么该表的数据也会被删除
  • 通常情况下,管理表用于存储和管理中间结果、汇总数据和基础数据。当数据规模较小时,管理表是一个不错的选择,因为它可以提供更好的查询性能,同时也更容易管理。

2、示例讲解

在Hive中,除了外部表,我们还可以创建内部表来存储数据。与外部表不同的是,内部表存储的数据位于Hive自身管理的HDFS上,因此,在创建内部表时,我们需要确保数据可以被正确地上传到HDFS上。下面是创建内部表并存储在本机的示例:

假设我们有以下数据文件,名为data.csv,存储在本地文件系统的/path/to/local目录下:

cat >data.csv<<EOFvalue1,1,2.3value2,2,3.4value3,3,4.5EOF

我们可以使用以下语句,在Hive中创建一个内部表:

CREATE TABLE internal_table (    column1 STRING,    column2 INT,    column3 DOUBLE)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILE;# 加载本地数据,LOCAL LOAD DATA LOCAL INPATH './data.csv' INTO TABLE internal_table;# 加载HDFS数据# 先将文件推送到HDFS上hdfs dfs -put ./data.csv /tmp/# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 加载HDFS上的数据LOAD DATA INPATH '/tmp/data.csv' INTO TABLE internal_table;# 查询select * from internal_table;


总之,外部表和管理表都可以在Hive中实现数据的存储和管理,但它们之间的不同主要体现在数据的存储和处理方式上。

四、分区表之静态分区和动态分区

Hive中的分区表可以进一步细分为静态分区和动态分区。

静态分区是指通过手动指定分区列的值来创建分区。例如,在创建一个基于年份的分区表时,我们可以手动指定每个分区名对应的年份:

CREATE TABLE sales (
  id int,
  date string,
  amount double)PARTITIONED BY (year string);
ALTER TABLE sales ADD PARTITION (year='2019') location '/data/sales/2022';
ALTER TABLE sales ADD PARTITION (year='2020') location '/data/sales/2023';

在上述示例中,我们通过 ALTER TABLE 语句手动添加了2019和2020两个年份的分区。

动态分区是指在加载数据时通过SQL语句自动创建分区。例如,在从一个包含销售记录的数据文件中加载数据时,可以自动根据数据中的年份信息创建相应的分区:

INSERT INTO TABLE sales PARTITION (year)SELECT id, date, amount, YEAR(date)FROM raw_sales;

在上述示例中,我们使用 PARTITION 子句指定在 CREATE TABLE 语句中定义的分区列year,并使用 YEAR(date) 表达式从数据中提取出年份信息。

动态分区的优点在于它可以大大简化创建和管理分区表的过程并提高效率;但是需要注意的是,它可能会在某些情况下产生不可预期的行为,例如可能创建太多分区。

总之,静态分区和动态分区都是用于在Hive中管理大型数据集的有效工具,具体使用需要根据具体情况选择最适合的方法,并理解它们的优点和缺点。

五、hive分区表严格模式和非严格模式

Hive分区表的严格模式和非严格模式可以通过以下两个参数进行设置:

  1. hive.exec.dynamic.partition.mode:该参数用于设置分区模式,其默认值为strict,即严格模式。可以将其设置为nonstrict,即非严格模式
# 登录hive客户端beeline -u jdbc:hive2://hive-hiveserver2:10000  -n hadoop# 设置SET hive.exec.dynamic.partition.mode=nonstrict;
  1. hive.exec.max.dynamic.partitions:该参数用于限制动态分区的最大数量。在非严格模式下,当动态分区的数量超过该参数指定的值时,Hive将抛出异常。可以通过以下语句来修改该参数:
SET hive.exec.max.dynamic.partitions=<value>;

其中,<value> 为一个整数值,表示限制的动态分区数量。如果需要取消该限制,可以将该参数设置为一个非正数,例如:

SET hive.exec.max.dynamic.partitions=-1;

需要注意的是,这些参数的设置仅对当前会话有效,也可以将其添加到Hive的配置文件中以在每个会话中自动应用。

总之,hive.exec.dynamic.partition.mode 和 hive.exec.max.dynamic.partitions 是控制Hive分区表严格模式和非严格模式的两个重要参数,开发人员可以根据自己的需求进行设置。

1)严格模式

严格模式要求在加载数据时必须指定所有分区列的值,否则将会导致抛出异常。例如,在下面的分区表中:

CREATE TABLE sales (
  id int,
  date string,
  amount double)PARTITIONED BY (year string, month string, day string)CLUSTERED BY (id) INTO 10 BUCKETS;

在严格模式下,我们必须为year、month和day三个分区列的所有可能取值指定一个分区:

INSERT INTO TABLE sales PARTITION (year='2019', month='01', day='03')SELECT id, date, amountFROM raw_salesWHERE YEAR(date) = 2019 AND MONTH(date) = 1 AND DAY(date) = 3;

在上述示例中,我们使用 PARTITION 子句手动为分区列year、month、day指定取值。

2)非严格模式

非严格模式则允许忽略某些分区列的值,这样使用 INSERT INTO 语句时只需指定提供的分区值即可。例如:

# SET hive.exec.dynamic.partition.mode=nonstrict;INSERT INTO TABLE sales PARTITION (year, month, day)SELECT id, YEAR(date), MONTH(date), DAY(date), amountFROM raw_salesWHERE YEAR(date) = 2019;

在上述示例中,我们使用 SET 语句设置分区模式为非严格模式,然后只提供了year分区列的值,而month和day分区列的值是从数据中动态计算得出的。

使用非严格模式可以简化分区表的创建和管理,但需要注意,它可能会产生一些意料之外的结果(例如可能创建太多分区),所以需要谨慎使用。

总之,分区表的严格模式和非严格模式都具有一些优点和缺点,具体使用需要根据具体情况选择最适合的方式。

六、分区表和分桶表示例讲解

1)分区表示例讲解

在Hive中,我们可以使用分区表来更有效地组织和管理数据。分区表将数据分为子集,每个子集对应一个或多个分区。这样,我们就可以更快地访问和查询数据,而不必扫描整个数据集。

创建分区表的语法类似于创建普通表,只不过要使用 PARTITIONED BY 子句指定一个或多个分区列,例如:

# 内部表CREATE TABLE partitioned_internal_table (
  id INT,
  mesg STRING)PARTITIONED BY (
  year INT,
  month INT)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILE;
# 外部表
CREATE EXTERNAL TABLE partitioned_external_table (
  id INT,
  mesg STRING)PARTITIONED BY (
  year INT,
  month INT)
ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/user/hive/partitioned_table/data';

上述语句创建了一个分区表,在列column1和column2的基础上,按照year和month两列进行了分区。

【注意】分区的实现依赖于Hive的底层存储Hadoop分布式文件系统(HDFS)。为了确定如何分配数据,Hive要求每个分区对应一个目录,该目录包含该分区数据的所有文件。因此,在将数据加载到分区表中时,必须提供与分区对应的目录

例如,如果我们要将一个CSV文件加载到分区表中,我们可以使用以下语句:

LOAD DATA LOCAL INPATH './file.csv' INTO TABLE partitioned_external_table PARTITION (year=2019, month=1);# 查看分区show partitions partitioned_external_table;

在上述语句中,我们使用 LOAD DATA 子句将 /data/file.csv 文件加载到partitioned_table 表中,并指定了分区year为2019,分区month为1。

假设我们的CSV文件具有以下内容:

1,test1,2019,11,test2,2019,12,test3,2022,13,test4,2023,1

使用以下语句查询分区表:

SELECT * FROM partitioned_external_table WHERE year=2019 AND month=1;

分区表的优点在于可以更高效地组织数据,同时也允许我们根据需要删除或添加分区。例如,我们可以使用以下语句删除分区:

ALTER TABLE partitioned_table DROP PARTITION (year=2019, month=1);

可以使用以下语句添加分区:

ALTER TABLE partitioned_external_table ADD PARTITION (year=2020, month=2);
# 查看分区show partitions partitioned_external_table;

总之,分区表是管理和查询大型数据集的有效方式,可以帮助我们更轻松地处理大量数据。

2)分桶表示例讲解

除了分区表之外,Hive还提供了另一种将数据分割成可管理单元的方式,即分桶。

分区和分桶的概念有一些相似之处,但也存在一些重要的区别。

  • 分区是指基于表的某些列将数据分割成不同的存储单元;
  • 而分桶是指将数据根据哈希函数分成一组固定的桶。

类比于分区,在创建一个分桶表时,我们需要指定分桶的数量和分桶的列。例如,以下是一个创建分桶表的示例:

CREATE TABLE bucketed_table (
  column1 data_type,
  column2 data_type,  ...) 
CLUSTERED BY (column1) -- 分桶列INTO 10 BUCKETS; -- 桶数量

在上述示例中,我们将column1作为分桶列,并将数据分成10个桶。

加载数据时,Hive根据指定的桶列计算哈希值,并将数据存储在对应的桶中。

INSERT INTO TABLE bucketed_table VALUES ('value1', 1, 2.3)

查询时,可以使用以下格式指定桶列:

SELECT * FROM bucketed_table TABLESAMPLE(BUCKET x OUT OF y ON column1);

在上述示例中,我们使用用于抽样数据的 TABLESAMPLE 子句,指定从桶x中抽取数据,并在分桶列column1上进行抽样。

分桶表的优点在于,我们可以更容易地执行等值和范围查询,并更好地利用MapReduce 的数据本地性,从而提高查询性能。但分桶表也有一些缺点,例如添加和删除数据涉及重新计算哈希函数和移动数据的成本。

总之,分区表和分桶表都是Hive管理和处理大型数据集的重要工具,可以帮助我们更轻松地组织、查询和分析大量数据。在具体使用时,需要考虑表的存储和查询需求,选择最适合的表类型。在实际场景中分区用的居多。

关于Hive 分区和分桶的区别和示例讲解就先到这里了,有任何疑问欢迎给我留言,后续会持续更新相关文章,请小伙伴耐心等待

from:https://baijiahao.baidu.com/s?id=1764915870006249443&wfr=spider&for=pc

如何优化 Spark 小文件,Kyuubi 一步搞定!(转)


Hive 表中太多的小文件会影响数据的查询性能和效率,同时加大了 HDFS NameNode 的压力。Hive (on MapReduce) 一般可以简单的通过一些参数来控制小文件,而 Spark 中并没有提供小文件合并的功能。下面我们来简单了解一下 Spark 小文件问题,以及如何处理小文件。

01Spark 小文件问题

 1.1 环境 

Kyuubi 版本:1.6.0-SNAPSHOT

Spark 版本:3.1.3、3.2.0 

 1.2 TPCDS 数据集 

Kyuubi 中提供了一个 TPCDS Spark Connector,可以通过配置 Catalog 的方式,在读取时自动生成 TPCDS 数据。 

只需要将 kyuubi-spark-connector-tpcds_2.12-1.6.0-SNAPSHOT.jar 包放入 Spark jars 目录中,并配置:

spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;

这样我们就可以直接读取 TPCDS 数据集:

use tpcds; 

show databases; 

use sf3000; 

show tables; 

select * from sf300.catalog_returns limit 10;

 1.3 小文件产生

首先我们在 Hive 中创建一个 sample.catalog_returns 表,用于写入生成的 TPCDScatalog_returns 数据,并添加一个 hash 字段作为分区。

我们先关闭 Kyuubi 的优化,读取 catalog_returns 数据并写入 Hive:

Spark SQL 最终产生的文件数最多可能是最后一个写入的 Stage 的 Task 数乘以动态分区的数量我们 可以看到由于读取输入表的 Task 数是 44 个,所以最终产生了 44 个文件,每个文件大小约 69 M。 

 1.4 改变分区数(Repartition) 

由于写入的文件数跟最终写入 Stage 的 Task 数据有关,那么我们可以通过添加一个 Repartition 操作, 来减少最终写入的 task 数,从而控制小文件:

添加 REPARTITION(10) 后,会在读取后做一个 Repartition 操作,将 partition 数变成 10,所以最终 写入的文件数变成 10 个。 

 1.5 Spark AQE 自动合并小分区 

Spark 3.0 以后引入了自适应查询优化(Adaptive Query Execution, AQE),可以自动合并较小的分区。

开启 AQE,并通过添加 distribute by cast(rand() * 100 as int) 触发 Shuffle 操作:

默认 Shuffle 分区数 spark.sql.shuffle.partitions=200 ,如果不开启 AQE 会产生 200 个小文件, 开启 AQE 后,会自动合并小分区,根据spark.sql.adaptive.advisoryPartitionSizeInBytes=512M 配置合并较小的分区,最终产生 12 个文件。

02Kyuubi 小文件优化分析

Apache Kyuubi (Incubating) 作为增强版的 Spark Thrift Server 服务,可通过 Spark SQL 进行大规模的 数据处理分析。Kyuubi 通过 Spark SQL Extensions 实现了很多的 Spark 优化,其中包括了RepartitionBeforeWrite 的优化,再结合 Spark AQE 可以自动优化小文件问题,下面我们具体分析 一下 Kyuubi 如何实现小文件优化。

  2.1 Kyuubi 如何优化小文件

 Kyuubi 提供了在写入前加上 Repartition 操作的优化,我们只需要将 kyuubi-extension-spark-3- 1_2.12-1.6.0-SNAPSHOT.jar 放入 Spark jars 目录中,并配置spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension 。相关配置:

通过 spark.sql.optimizer.insertRepartitionNum 参数可以配置最终插入 Repartition 的分区数, 当不开启 AQE,默认为 spark.sql.shuffle.partitions 的值。需要注意,当我们设置此配置会导致 AQE 失效,所以开启 AQE 不建议设置此值。 

对于动态分区写入,会根据动态分区字段进行 Repartition,并添加一个随机数来避免产生数据倾斜,spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 用来配置随机数的范围,不 过添加随机数后,由于加大了动态分区的基数,还是可能会导致小文件。这个操作类似在 SQL 中添加distribute by DYNAMIC_PARTITION_COLUMN, cast(rand() * 100 as int)。

  2.2 静态分区写入 

开启 Kyuubi 优化和 AQE,测试静态分区写入:

可以看到 AQE 生效了,很好地控制了小文件,产生了 11 个文件,文件大小 314.5 M 左右。

 2.3 动态分区写入

我们测试一下动态分区写入的情况,先关闭 Kyuubi 优化,并生成 10 个 hash 分区:

产生了 44 × 10 = 440 个文件,文件大小 8 M 左右。

开启 Kyuubi 优化和 AQE:

产生了 12 × 10 = 120 个文件,文件大小 30 M 左右,可以看到小文件有所改善,不过仍然不够理想。

此案例中 hash 分区由 rand 函数产生,分布比较均匀,所以我们将spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 设置成 0 ,重新运行,同时将动态分区数设置为5 :

由于动态分区数只有 5 个,所以实际上只有 5 个 Task 有数据写入,每个 Task 对应一个分区,导致最终每个分区只有一个较大的大文件。

通过上面的分析可以看到,对于动态分区写入,Repartition 的优化可以缓解小文件,配置spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100 解决了数据倾斜问题, 不过同时还是可能会有小文件。 

  2.4 Rebalance 优化 

Spark 3.2+ 引入了 Rebalance 操作,借助于 Spark AQE 来平衡分区,进行小分区合并和倾斜分区拆 分,避免分区数据过大或过小,能够很好地处理小文件问题。 

Kyuubi 对于 Spark 3.2+ 的优化,是在写入前插入 Rebalance 操作,对于动态分区,则指定动态分区列 进行 Rebalance 操作。不再需要 spark.sql.optimizer.insertRepartitionNum 和spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置。

测试静态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE:

Repartition 操作自动合并了小分区,产生了 11 个文件,文件大小 334.6 M 左右,解决了小文件的问题。

测试动态分区写入,使用 Spark 3.2.0 开启 Kyuubi 优化和 AQE,生成 5 个动态分区:

Repartition 操作自动拆分较大分区,产生了 2 × 5 = 10 个文件,文件大小 311 M 左右,很好地解决倾斜问题。

03总 结

从上面的分析可以看到,对于 Spark 3.2+,Kyuubi 结合 Rebalance 能够很好地解决小文件问题,对于 Spark 3.1,Kyuubi 也能自动优化小文件,不过动态分区写入的情况还是可能存在问题。

相关的配置总结:

更多 AQE 配置可以参考:How To Use Spark Adaptive Query Execution (AQE) in Kyuubi

原文连接:https://www.modb.pro/db/423478

【原创】用EMR建设实时数仓

建设实时数仓的目的和意义

实时数仓目的

数仓概念:数据尽可能多,保存时间尽可能久

图片描述

实时概念:数据流式,处理及时、瞬时、短时、事件或者微批响应

图片描述

数仓跟实时从概念上就有冲突,所以本质上不太适合处理广泛的问题,比如,对一个月,甚至是一年的数据进行统计计算。

图片描述

所以,实时数仓应该目前作为离线数仓的一种补充,解决因离线数仓实时性低而无法解决的问题,具体点说就是处理离线两个周期间隔的数据问题,不适合解决大批量数据聚合问题、业务性太强的以及对实时性要求很高问题。

实时数仓的意义

图片描述

实时数仓从概念上讲还是要靠近数仓的概念,数据分层,面向主题,数据尽可能集成,结构相对稳定,不易发生变化。

图片描述

对于实时数仓来讲,数据量不需要保存像离线那么久,上一节我们提到,实时数仓处理两个离线周期间隔的数据即可,如上图,以时报为例,实时数仓补充中间数据即可,以天为例,实时数仓最多只需要保留3~5天数据即可,能够支持一段时间的数据追溯和重导就可以了。

实时数仓可以解决哪类问题

图片描述

利用EMR建设实时数仓

实时数仓对比离线数仓

图片描述

实时数仓架构

图片描述

从图中可以看到,

1、ODS并不是实时数仓的一部分,是依赖外部数据源,比如binlog,流量日志,系统日志,或者是其他消息队列

2、应用层也不是实时数仓的一部分,对于数据的使用,通过实时数仓暴露Topic来使用

3、实时数仓要求层次要少,因为需要尽可能降低延迟

用EMR搭建实时数仓

图片描述

1、底层数据源可以接企业内部binlog、日志或者消息队列

2、从ODS层经过与维表轻度扩展,形成明细层明细表,明细表用一个Ckafka topic表示,计算采用Oceanus或者EMR FlinkSql 关联查询,维表采用EMR Hbase存储

3、从明细层经过进一步汇总计算,形成汇总层,此时数据已经是面向主题的汇总数据,就是传统意义上的大宽表,一个主题是一系列Ckafka topic,计算采用Oceanus或者EMR FlinkSql 关联查询以及汇总计算

实时数仓各层搭建

ODS层搭建

图片描述

1、 之所以没有把ODS层放在实时数仓的一部分,是因为实时数仓的ODS并不像离线数仓ODS是采集过来的原始数据,现在一般企业都已经具备了如上图的底层数据源

2、 Binlog,是数据库日志,通过Binlog可以自数据库主从间同步,可以同步关系型数据库数据,目前企业线上数据库都采用Mysql这样的数据库,可以通过抓取Mysql binlog 获取数据库变更信息,数仓中重要的业务数据,支付相关,用户相关,管理相关数据一般都从这种数据源获得

3、 Log日志,服务器日志,像服务器系统日志采集,都是通过这种形式进行采集

Ckafka,企业通过消息队列提供数据源服务,比如,点击流服务,会把用户点击事件通过上报服务器上报到Ckafka,为后续分析提供原始数据

该层搭建的注意点:
1、 业务选择数据源,尽量跟离线保持一致,比如某个业务,数据源即可以通过Binlog,也可以通过Log日志采集,如果离线数仓业务是通过Binlog,那么实时数仓也取Binlog,否则后续产生数据不一致,非常难以定位

2、数据源要求一致性,对于Ckafka和Binlog 需要进行分区一致性保证,解决数据乱序问题

明细层搭建

图片描述

建设标准与离线数仓目标一致,解决原始数据存在噪音,不完整,形式不统一等问题

图片描述

数据解析,业务整合,数据清洗,解决噪音,不完整,数据不一致问题;模型规范化(提前指定号规则,尽量跟离线保持一致),形成数据规范,规范尽可能跟离线保持一致,命名,比如,指标命名等;

与离线数仓不同之处在于,离线调度是有周期的,时报一小时,天报周期为一天,如果修改数据表字段,只要任务没开始,就可以修改,而实时是流式,7X24小时不间断运行的,想要修改流中的字段或者格式,对下游影响是不可预估的

实时数仓如果修改字段不像离线,在间隔期间通知下游把作业都改了就没事了,但是实时不一样,实时你改掉了字段,下游作业必须可以认识你修改的内容才行,kafka不是结构化存储,没有元数据的概念,不像Hive,如果表名不规范,找一个统一时间,把catolog改规范,然后把脚本一改就就解决了。

明细建设关键,我们会在每一条数据上增加一些额外字段到数仓里

图片描述

举例说明这些额外字段的意义

事件主键:对于上游数据重复问题,我们会根据一些数据内的字段来判断上有数据的唯一性,比如binlog,<集群id_><库id_><表id_>数据id_数据生成时间。

数据主键:唯一标识数据表的一行记录,可以使用数据库主键,主要用来解决分区一致性及分区有序。

数据元数据版本:上面介绍了,流式计算是7X24小时不间断的运算,当修改了数据结构,增加,删除了字段,对下游的影响是不可预估的,因此元数据变更需修改该字段,保持数据流中新老版本数据双跑,下游选择合适的时机进行数据切换。

数据批次:跟元数据用途相似,当明细层逻辑发现问题,需要重跑数据,为了对下游任务不产生影响,调整了明细层逻辑后,需要回倒位点重跑数据,同时需要跟老逻辑任务双跑,待下游业务都切换到新的逻辑后,老逻辑任务才可以停止。

还有一个思路,可以直接把明细层数据,也可以直接写到druid 直接用于分析。

维度层搭建

图片描述

维度数据处理:

图片描述

如上图,对于变化频率低,地理,节假日,代码转换,直接同步加载到缓存里,或者是新增数据,但是增加进来就不变了,通过数据接口,访问最新数据,然后通过本公司数据服务对外提供数据

图片描述

如上图,对于变化频率高,比如商品价格,也是需要监听变化消息,然后实时更新维度拉链表。对于比如像最近一个月没有消费用户这样的衍生维度,是需要根据变化消息,通过计算得到的衍生维度拉链。

因为维度数据也在发生变化,为了能够让源表数据匹配到维表,我们会给维表增加多版本minversion,然后通过TIMERANGE => [1303668804,
1303668904]筛选出源数据指定的维表版本数据。

这里有些同学可能觉得如果版本一致保存下去,会不会非常大,是的,我们响应的需要配置TTL保证维表数据量可控,上文我们介绍过,实时数仓解决是离线数仓两个间隔的问题,那么像这种变化频繁的数据我们TTL设置一周足够了。

关于源表与维表如果进行join,Flink原生sql以及Oceanus都是采用UDTF函数以及Lateral
Table 进行联合使用,其中UDTF我们可以实现查询数据服务获取维表数据的能力,Oceanus请参考相关材料。

汇总层搭建

图片描述

汇总层加工其实跟离线数仓是一致的,对共性指标进行加工,比如,pv,uv,订单优惠金额,交易额等,会在汇总层进行统一计算。

Flink提供了丰富的窗口计算,这使得我们可以做更细力度的聚合运算,例如,我们可以算最近5分钟,10分钟的数据聚合,根据时间窗口的间隔,也需要调整相应的TTL,保障内存高效实用。

Flink提供了丰富的聚合计算,数据都是要存在内存中的,因此需要注意设置state的TTL,例如,做Count(Distinct
x)。或者在进行PV,UV计算时候,都会使用大量的内存,这一块,当处理的基数比较大的时候,推荐使用一些非高精度去重算法,Bloom过滤器,Hyper LogLog等。

汇总层也需要在每一条数据上增加一些额外字段到数仓里,这块与明细层一致,就不在单独讲解了。

数据质量保证

图片描述

对于实时数仓数据质量的管理,我们通常由三步操作组成

第一步,数据与离线数据进行对比

首先,将汇总层数据Topic通过平台接入任务接入到离线仓库,然后通过数据质量任务,定时对实时数仓和离线数仓数据进行对比,并配置报警,数据差异,数据波动等。

第二步,配置报警,我们会在明细层以及汇总层,Topic配置生产监控,与以往数据波动,上游数据延迟或者积压,都需要进行报警。

第三部,构建实时血缘, Flink 在读取数据时候,会把信息读到flink catalog 这样就知道这个任务读取了哪个表,在解析客户DDL代码时,可以获得目标表信息,同步到我们的元数据服务。

参考文献:

美团实时数仓搭建:https://tech.meituan.com/2018/10/18/meishi-data-flink.html

菜鸟实时数仓:https://mp.weixin.qq.com/s/9ZRG76-vCM7AlRZNFCLrqA

Spark 实践 | B站离线计算的实践

1. 背景介绍

2018年B站基于Hadoop开始搭建离线计算服务,计算集群规模从最初的两百台到发展到目前近万台,从单机房发展到多机房。我们先后在生产上大规模的使用了 Hive、Spark、Presto 作为离线计算引擎,其中 Hive 和 Spark 部署在 Yarn 上,具体的架构如下,目前每天有约20w的离线批作业运行在 Spark 和 Hive 上,下面介绍下我们做了哪些工作来确保这些作业的高效与稳定。

2. 从Hive到Spark

21年初的时候Hive还是B站主要的离线计算引擎,80%以上的离线作业使用 Hive 执行,Spark2.4作业占比接近20%,集群资源的使用率长期维持在80%以上。21年3月 Spark3.1 发布,相较于 Spark2.4 性能有了较大的提升,我们开始推动Spark3.1 在B站的落地,同时将 Hive-SQL 整体迁移至 Spark-SQL。

在B站,离线计算的调度已经完成了收口,80%以上的作业来自于自建的 BSK 调度平台,其余的作业基本也都是 airflow 提交上来的,只有少量的任务来自散落的开发机。在推动 Hive 升级 Spark 时只要将调度平台的作业完成迁移就可以覆盖90%以上的作业。起步阶段我们进行了少量的人工迁移,对用户 SQL 进行了简单改写,修改了输入输出表后由两个引擎执行,开发了一个结果对比的工具,通过对双跑结果分析保障迁移效果。基于这个操作链路我们自研了一个自动迁移工具,减少人工失误和人力压力。

2.1 语句转换

我们重写了 SparkSqlParser,将从调度系统中收集到的 SQL 进行输入输出表的替换,避免对生产环境的影响。调度平台进行作业调度时以 DAG 为单位,一个调度任务里面可能存在多条 SQL,这些 SQL的输入输出表间存在依赖关系,为了保证双跑尽可能的模拟生产表现,对一个 DAG 里面的多个调度作业进行输入输出表替换时进行整体替换,保证了相互间依赖。对于 Select语句因为本身没有输出表,需要将 Select 语句转换为 CTAS 语句,这样就能将执行结果落地进行对比,需要注意的是转换过程中要将列名进行编码防止中文列导致的建表失败。当迁移工具识别出 SQL 语句为 DDL 语句,如果不是 CTAS 这种需要消耗计算资源的就直接跳过对比,同时对该语句进行标记,保证交由 Hive 执行,防止意外的元信息修改。

2.2 结果对比

双跑输出结果的对比是保证数据准确性的关键。首先对两个结果表的 Schema 进行对比,这个通过调用 DESC 语法返回结果对照就可以完成。对于 Schema 一致的两个表则进行下一步操作,两表全量数据对比,我们设计了一个 SQL 对数据按行进行整体对比,具体的对比思路如图:

第一步将两表按所有列(这里是 name 和 num 字段)进行 GROUP BY,第二步 UNION ALL 两表数据,第三步再按所有列(这里是 name, num 和 cnt 字段) GROUP BY 一次产生最终表,在最终表中 cnts 值为2的行表示这行数据在两表中都有且重复值一致,对于值非2的数据就是差异行了。从上图的例子来说差异行 Jack|1|2|1 表示 Jack|1 这行数据数据在一个表中存在两行,结合差异行 Jack|1|1|1 来看其实就是 Jack|1 这行数据一个表有一行另一个表有两行。通过这个方式就可以对双跑产出的结果表进行一个全量的对比。通过这种结果对比方法可以完成大部分双跑任务的结果对比,但是对于结果表中存在 LIST、SET、MAP 这种容器类型的,因为在 toString 时顺序是无法保证的,所以会被识别为不一致,此外对于非稳定性的 SQL 如某列数据是 random 产生,因为每次执行产出的结果不一致,也会识别为对比失败,这两种情况下就需用人工的介入来分析了。

资源利用率的提升是做引擎升级的出发点,除了结果对比来保证数据准确性,我们还做了资源消耗对比来保证迁移的收益。对比系统收集了每个作业的执行时间以及消耗的资源,从执行时间、CPU 和内存的资源消耗进行两个引擎执行性能的对比,在执行最终迁移前依据收集的数据为用户提供了迁移的预期收益,提高了用户迁移任务的积极性。从迁移中收集的数据来看 hive 切到 spark 可以减少40%以上的执行时间,同时整体资源消耗降低30%以上。

2.3 迁移&回滚

迁移系统对每个任务都执行了至少3次的双跑对比,但依然不能完全消除执行迁移的风险,在实际迁移过程中的几次问题都是迁移后稳定性不符合预期导致的,因此迁移系统对于迁移后的任务增加了监控,在一个任务迁移后,该任务的前3次调度执行消耗的时间、CPU 和内存资源将被用来和迁移前的七次平均执行数据对比,如果存在负优化的情况则会将这个任务执行引擎进行回滚并通知我们介入进行进一步分析。

3. Spark 在B站的实践

3.1 稳定性改进

3.1.1 小文件问题

随着B站业务高速发展,数据量和作业数增长越来越快,伴随而来的小文件数也快速增长,小文件太多会增加 HDFS 元数据的压力,在计算引擎读取时也大大增加了读请求的数量降低了读取效率。为了解决小文件的问题,在写表场景下对 Spark 做了如下两种改造。

  • 兜底小文件合并:我们修改了数据的写出目录,引擎计算先写到一个中间目录,在 FileFormatWriter.write 结束后 refreshUpdatedPartitions 前,插入了一个文件合并逻辑,从中间目录中获取分区下文件的平均大小,对于不存在小文件情况的目录直接MV到最终目录,对于存在小文件的目录新增一个读 RDD coalesce 到一个合适值写出后 MV 到最终目录。
  • 基于 reparation 的小文件合并:可以看到兜底小文件合并方式需要先将数据落地到 HDFS,重新读取后再写出,这样做放大了 HDFS写操作(三副本),降低了计算引擎的执行性能。而 Spark3的 AQE 特性可以在有 shuffle 的场景下有效解决小文件的问题,很多情况下对于没有 shuffle 的场景新增一个 reparation 操作就可以借助 AQE 的能力解决小文件的问题。社区 AQE 对于 reparation 这个 hint 是不会调整目标分区数的,我们新增了一个 rebalance hint,本质上和reparation 一样只是将 AQE 的特性应用在了这个操作上,同时将 AQE 目标 size 相关的属性和 rebalance 设置属性做了隔离方便更好的设置文件大小而不影响计算的并行度。rebalance 操作会在最终写出前增加一个 shuffle stage,有些情况下没有这个 stage 上游输出就已经没有小文件了,为此作业是否增加 rebalance 操作依赖于我们对任务的画像通过 HBO 系统开启。

3.1.2 shuffle 稳定性问题

Shuffle 稳定性直接影响了 Spark 作业的 SLA,在B站推动 Spark 升级过程中成为用户顾虑的点。

  • shuffle 磁盘分级:B站 Yarn 主集群采用 DataNode 和 NodeManage 混部模式,节点配置了多块 HDD 盘和少量 SSD 盘,NM 以 HDD 盘作为计算盘,由于和 DN 没有做到 IO 隔离,DN 和shuffle service 经常互相影响,因此我们对DiskBlockManager 进行了改造,优先使用 SSD 盘下的目录作为工作目录,当 SSD 盘存储空间或者 inode 紧张时则降级到 Yarn 配置的计算目录,借助 SSD 优异的随机 IO 能力,有效的提高的了 shuffle 稳定性。
  • remote shuffle service:push based shuffle 方案可以大量降低磁盘随机IO读请求,如下图:

通过中间服务将同属一个分区的数据进行归并,后续 reduce 操作就不需要从上游所有的 Map 节点拉取数据,在 shuffle 上下游 Task 数量多的情况下会对磁盘 IO 压力指数放大,生产上 shuffle heavy 的任务表现很不稳定,经常出现FetchFailed Exception。B站在推动 RSS 落地时选择了社区3.2 Push based shuffle 的方案,这个方案主要的优点是对 AQE 支持比较好,缺点是因为本地也要写一份数据放大了写。将数据先写本地后异步的发送到 driver 维护的 executor 节点的 external shuffle 节点上,后续生产实践中该方案有个问题,就是当作业启动时通常 driver 维护的 executor 数不足以满足远程节点的选择,而 SQL 作业参与计算的数据量通常是随着过滤条件层层递减的,通常 shuffle 数据量大的时候因为没有足够的节点会 fall back 到原先的 shuffle 方式,为了解决这个问题,我们新增了 shuffle  service master 节点,具体调用流程如下图,所有的 external shuffle 节点启动时都会注册到 shuffle master 节点上,后续节点本身也会周期性的上报心跳和节点繁忙程度,DAGScheduler 后续请求远程节点都从 shuffle master 申请,这样不仅解决了冷启动节点不足的问题,在节点选择上也考虑了节点的健康程度。因为是先落盘后发送,在 stage 执行结束后会有一个等待时间,这里面会有个性能回退的问题,对小任务不友好,所以在生产应用中我们基于任务画像系统 HBO 自动决定任务是否启用RSS服务,目前生产大约7%的大任务在使用RSS 服务,这些任务平均执行时间缩短了25%,稳定性有了显著提升。

目前B站生产中使用该方案基本解决了 shuffle 稳定性的问题,不过这套方案依旧需要计算节点配置本地 shuffle 盘,在本地落 shuffle 数据,无法支持存算分离的架构。后续我们在 k8s 上会大规模上线混部集群,需要尽量不依赖本地磁盘,避免对在线应用的影响,我们也关注到腾讯和阿里相继开源各自的 RSS 方案,我们也在尝试在生产中使用纯远程 shuffle 方案来满足 Spark on K8s 的技术需要。

3.1.3 大结果集溢写到磁盘

在adhoc 场景中用户通常会拉取大量结果到 driver 中,造成了大量的内存消耗,driver 稳定性又直接影响着用户即席查询的体验,为此专门优化了 executor fetch result 的过程,在获取结果时会监测 driver 内存使用情况,在高内存使用下将拉取到的结果直接写出到文件中,返回给用户时则直接分批从文件中获取,增加 driver 的稳定性。

3.1.4 单 SQL task 并行度、task 数、执行时间限制

生产上我们按队列隔离了用户的 adhoc 查询,在实践过程中经常性的遇到单个大作业直接占用了全部并行度,有些短作业直接因为获取不到资源导致长时间的 pending 的情况,为了解决这种问题首先对单个 SQL 执行时间和总 task 数进行了限制,此外考虑到在 task 调度时有资源就会全部调度出去,后续 SQL 过来就面临着完全无资源可用的情况,我们修改了调度方法对单个 SQL 参与调度的 task 数进行了限制,具体的限制数随着可用资源进行一个动态变化,在 current executor 数接近于 max executor 的情况下进行严格限制 ,在 current executor 数明显少于 max executor 的情况下,提高单 SQL 并行的 task 总数限制。

3.1.5 危险 join condition 发现& join 膨胀率检测

  • 危险 join condition 发现

在选择 join 方式的时候如果是等值 join 则按照 BHJ,SHJ,SMJ 的顺序选择,如果还没有选择出则判断 Cartesian Join,如果 join 类型是 InnerType 的就使用 Cartesian Join,Cartesian Join 会产生笛卡尔积比较慢,如果不是 InnerType,则使用 BNLJ,在判断 BHJ 时,表的大小就超过了 broadcast 阈值,因此将表 broadcast 出去可能会对 driver 内存造成压力,性能比较差甚至可能会 OOM,因此将这两种 join 类型定义为危险 join。

如果不是等值 join 则只能使用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 时选不出 build side 说明两个表的大小都超过了 broadcast 阈值,则使用 Cartesian Join,如果 Join Type 不是 InnerType 则只能使用 BNLJ,因此 Join 策略选择Cartesian Join 和第二次选择 BNLJ 时为危险 join。

  • join 膨胀率检测

ShareState 中的 statusScheduler 用于收集 Execution 的状态和指标,这其中的指标就是按照 nodes 汇总了各个 task 汇报上来的 metrics,我们启动了一个 join 检测的线程定时的监控 Join 节点的 “number of output rows”及 Join 的2个父节点的 “number of output rows” 算出该 Join 节点的膨胀率。

  • 倾斜 Key 发现

数据倾斜是 ETL 任务比较常见的问题,以 shuffle 过程中的倾斜为例,通常有以下几个解决方法:增大 shuffle 的分区数量从而使数据分散到更多的分区中;修改逻辑,将 shuffle 时的 key 尽可能打散;单独找出产生了极大倾斜的 key,在逻辑中单独处理等等。但在进行这些处理之前,我们都需要先知道倾斜发生在 SQL 逻辑的哪个部分以及发生倾斜的是哪些 key。为了帮助用户自助高效的解决数据倾斜问题,我们实现了倾斜 key 发现的功能。以 SortMergeJoin 为例,在 shuffle fetch 阶段,首先根据 mapStatuses 计算出每个 partition size,并根据一定策略判断该 task 所处理的 partition 是否倾斜。如果倾斜,则在 join 阶段对数据进行采样,找到发生倾斜的 key,通过 TaskMetric 发送到 driver 端,driver 端消费 metric后会记录倾斜信息。

上面这些 bad case 在运行时发现后会自动将信息发送到我们内部作业诊断平台,用户可以查看并对语句做优化和改进。

3.2 性能优化

3.2.1 DPP 和 AQE 兼容

spark3.1 的 DPP 和 AQE 存在兼容问题,在使用 AQE 后 DPP 的策略就无法生效,这个问题在3.2得到了修复,我们将3.2的相关代码 backport 回来,从 TPCDS 测试上看对3.1有很明显的提升。

3.2.2 AQE 支持 ShuffledHashJoin

AQE 通过对 map 阶段收集的指标数据来优化 Join 方式,对于存在小表的情况能将 SMJ 优化为 BHJ,这个操作可以显著的优化性能。Spark的 shuffle 策略还有一个就是 ShuffledHashJoin,该策略性能相对较好,但内存压力大,在默认情况下为了保证任务的稳定性我们将其关闭,基于 AQE 的思想,在 map 完成后收集 partition size,当最大的 partition size 小于定义的值后,通过新增 DynamicJoin 优化策略将 SMJ 优化为 SHJ。

3.2.3 Runtime filter

DPP 通过对大表直接进行 partition 级别的裁剪,可以大大提高查询速度,但 DPP 的适用条件也相对严格,需要大表的分区列参与 join,但如果大表参与 join 的列为非分区列则无法应用。我们知道 shuffle 是比较耗时的操作,shuffle 的数据量越大,耗时越久,而且对网络,机器 IO 都会产生比较大的压力。如果能在大表 shuffle 前根据非分区列的 join 列对其进行过滤,即使无法像 DPP 一样直接减少从存储中读取的数据量,但减小了其参与 shuffle 以及后续操作的数据量,也能获得比较不错的收益,这就是 runtime filter 的动机,即运行时预先扫描小表获取 join 列的值,构造 bloom filter 对大表进行过滤。具体实现思路和 DPP 基本一致,首先在 SparkOptimizer 新增 DynamicBloomFilterPruning 规则,逻辑上类似PartitionPruning,符合一系列判断条件后插入一个节点 DynamicBloomFilterPruningSubquery。与 DPP 不同的是,如果 join 可以被转化为 BroadcastHashJoin,则不会应用该规则,因为在 BroadcastHashJoin 的情况下对大表进行预先的过滤其实是多余的(非 pushdown 的情况下)。判断是否加入 filter 节点的主要逻辑如下,这里以裁剪左表(左右两侧都为 logicalPlan,为了方便表达,用左右表指代)为例进行说明,需要满足以下条件:

  • 右表 rowCount 需要小于左表
  • Join 类型支持裁剪左表
  • 右表 rowCount > 0
  • 右表 rowCount 小于 spark.sql.optimizer.dynamicBloomFilterJoinPruning.maxBloomFilterEntries,默认值为100000000,避免 bloom filter 占用内存过大
  • 右表中没有DynamicBloomFilterPruningSubquery
  • 右表不是 stream 且存在 SelectivePredicate
  • 左表(这里的左表是真正的左表或者包含左表的Filter节点)没有 SelectivePredicate,因为如果存在 SelectivePredicate,那么下一步便无法根据统计信息去计算过滤收益

在 prepare 阶段,PlanAdaptiveSubqueries 会把 DynamicBloomFilterPruningSubquery 节点替换为 DynamicPruningExpression(InBloomFilterSubqueryExec(_, _, _)),扩展了PlanAdaptiveDynamicPruningFilters,支持对以上节点进行处理。新增了 BuildBloomFilter 和 InBloomFilter 两个 UDF。BuildBloomFilter 在 sparkPlan prepare 阶段提交任务构造 BloomFilter 并 broadcast 出去,具体的 evaluate 逻辑还是交给 InBloomFilter。另外在 AQE 的reOptimize 阶段也新增了规则 OptimizeBloomFilterJoin,这个规则主要是用来根据执行过程的 metric 信息更新BuildBloomFilter的expectedNumItems。

可以看到在开启了runtime filter后数据量在join前从120亿条降至3W条,收益还是相当明显的。

3.2.4 Data skipping

目前B站离线表存储主要使用 orc、parquet 格式,列式存储都支持一定程度的 data skipping,比如 orc 有三个级别的统计信息,file/stripe/row group,统计信息中会包含count,对于原始类型的列,还会记录 min/max 值,对于数值类型的列,也会记录 sum 值。在查询时,就可以根据不同粒度的统计信息以及 index 决定该 file/stripe/row 是否符合条件,不符合条件的直接跳过。对于统计信息及索引的细节见orc format  (https://orc.apache.org/specification/ORCv1/)  和 orc index (https://orc.apache.org/docs/indexes.html)  。Parquet 与 orc 类似,也有相应的设计,具体见parquet format (https://github.com/apache/parquet-format)  和 parquet pageIndex (https://github.com/apache/parquet-format/blob/master/PageIndex.md)  。虽然 orc/parquet 都有 data skipping 的能力,但这种能力非常依赖数据的分布。前面提到统计信息中会包含每一列的 min/max 值,理论上如果查询条件(比如> < =)不在这个范围内,那么这个file/stripe/row group 就可以被跳过。但如果数据没有按照 filter 列排序,那最坏的情况下,可能每个 file/stripe/row group的min/max 值都一样,这样就造成任何粒度的数据都不能被跳过。为了增加列式存储 data skipping 效果,可以通过对数据增加额外的组织,如下:

 select     count(1)   from     tpcds.archive_spl_cluster   where     log_date = '20211124'     and state = -16

表 archive_spl,不调整任何分布与排序

表 archive_spl_order,order by state,avid

通过对 state 进行 order 后 scan 阶段数据量直接从亿级别降至数十万级别。在生产中我们通过对 SQL 进行血缘分析找到那些热点表及高频 filter 列,将这些热列作为 table properties 存入 hms 中,在 Spark 执行时根据从 hms 中获取的列信息,通过相应的优化规则,物理计划自动增加 sort 算子,完成对数据组织。这个方案是基于列存优化数据组织来进行 data skipping,目前我们也在往索引方向上进一步探索。

3.3 功能性改进

3.3.1 对于ZSTD的支持

Spark 社区在3.2版本全面支持了 ZSTD 压缩,为了更好的使用 ZSTD,我们在 Spark3.1  的基础上引入了社区的相关 patch。其中也遇到了一些问题。在测试 ZSTD 的过程中偶然发现下推到 ORC 的过滤条件没有生效,经调查发现是 ORC 代码的 bug,在和社区讨论之后,我们修复了该 bug并将 patch提交给了社区:https://issues.apache.org/jira/browse/ORC-1121 。

离线平台的 Presto 也承接了很多 ETL 任务,由于 Presto 使用的是自己实现的 ORC reader/writer,所以在 Spark 升级 ORC 版本之后,对一些 Presto 写出的表,出现了查询结果错误的问题。正常情况下,Apache ORC writer 在写文件时会记录每个 stripe/rowGroup 中每列的统计信息,如 min/max 等。Apache ORC reader 在读取文件时会根据这些统计信息结合下推的过滤条件进行 stripe/rowGroup 级别的过滤。但 Presto ORC writer 在写文件时,如果 String 类型的列长度超过64 bytes,则这一列不会记录 min/max 信息。虽然 Presto ORC reader 可以正常处理这类文件,但 Spark/Hive 使用的 Apache ORC reader 因为无法正常的反序列化 columnStatistics 得到正确的统计信息,导致做 stripe/rowGroup 级别的过滤时出现了错误的结果。我们也发现这个问题是由于 ORC 1.6 版本的一次代码重构导致,1.5及之前是没有该问题的。我们已在内部分支修复了该问题,也已将问题反馈给社区。

3.3.2 多格式混合读兼容

历史上很多表使用了 text 存储,在资源上造成了很大的浪费,通过修改表的元信息可以保障新增分区切换到列存,这就造成了一个离线表可能存在多种 fileformat 的情况,为了兼容我们修改了 DataSourceScanExec 相关的逻辑,将reader 的实例化从基于table元信息粒度细化到分区元信息粒度。

3.3.3 转表&小文件合并语法

为了方便用户修改表的存储格式和文件压缩格式我们在引擎层提供了相关语法及具体实现。用户可以通过指定分区条件对特定分区进行转换。

CONVERT TABLE target=tableIdentifier        (convertFormat | compressType)  partitionClause?               #convertTableMERGE TABLE target=tableIdentifier        partitionClause?                                               #mergeTable

3.3.4 字段血缘

作业间的依赖关系分析、数据地图等业务都需要SQL血缘的支持,团队后续工作(z-order , analyze , index)也需要依赖血缘,我们通过注册一个 LineageQueryListener 继承 QueryExecutionListener,在 onSuccess 方法拿到当前执行的QueryExecution,通过 analyzedLogicalPlan,利用 NamedExpression 的 exprId 映射关系,对其进行遍历和解析,构建出字段级血缘(PROJECTION/PREDICATE)和 levelRelation(层级关系)。

3.4 基于历史执行的自动参数优化(HBO)

Spark 提供了大量的参数设置,对于用户而言了解这些参数并使用好需要花费很大的代价,在很多情况下不同的参数调优对于 spark 的作业执行和资源消耗会有很大差异。为了尽可能的适配任务执行,我们预设了一组参数,这种统一配置存在很多问题,以内存而言为了适配尽可能多的任务,该值设置偏大,通过对执行的分析发现大量的任务存在资源浪费的问题,整体的内存利用率仅20%左右。要求每个用户成为专家对作业进行细致的调优显然不可能,因此我们设计了 HBO 系统,具体的思路如下图:

首先对任务执行的 SQL 进行了指纹计算,通过指纹来标识该任务每天执行情况,将每次执行中采集到的 metrics 收集后用策略进行分析给出相应的参数优化建议,在下次执行的时候根据指纹来获取推荐的执行参数,对于使用默认参数的任务则进行覆盖,对于那些用户指定的参数则优先使用用户参数。

  • 内存优化策略:通过收集每个 executor 的峰值内存,如果峰值内存占配置内存比值低于30%,就推荐使用更少的内存来执行此次的计算,对于峰值内存占比过高的任务,则调大内存配置。通过这个策略生产上的内存使用率提升至50%左右。
  • 并行度优化策略:生产上开启了动态资源配置,在对数据分析时发现有些节点从分配后就没有task执行过,完全浪费了节点的资源,对于这些任务会在下次执行的时候降低 spark.dynamicAllocation.executorAllocationRatio 值来降低执行并行度,此外默认提供的 spark.sql.shuffle.partitions 值对于大任务来说执行并行度不够,后续也将进行自动的调整。
  • 优化shuffle策略:如上文所讲 RSS 对小任务存在性能下降的问题,通过对 block size、shuffle 数据量的分析,HBO 系统只会对那些 shuffle heavy 任务开启使用 RSS 服务。
  • 小文件合并策略:小文件合并会消耗额外的资源,对于不存在小文件情况的作业 HBO 系统会关闭小文件合并相关的配置。

此外平时工作中一些 feature 的上线也会依赖该系统进行一个灰度过程。

3.5 Smart Data Manager (SDM) 

Smart Data Manager(SDM)是我们自研的一个对数据进行组织和分析的服务,通过对数据的额外处理将我们对 Spark 的一些技改真正落地。它的整体架构如图,目前提供了如下的几个数据组织和分析能力:

  • 表存储和压缩方式的转换:将表从 Text 存储转换为 ORC 或 Parquet 存储,将压缩类型从 None 或 Snappy 转换为 ZSTD 可以带来不错的存储和性能收益,SDM 提供了按分区对表异步进行转换的能力。
  • 数据重组织:在分区内部按列对数据进行 order/zorder 组织可以有效的提高 data skipping 的效果,新增分区通过查询 table properties 中的排序列 meta 来改写执行计划应用,存量分区就可以通过 SDM 重刷。
  • Statistics 的统计:开启 CBO 时需要依赖对表统计信息的收集,在对 hive 表的列进行索引时也依赖收集到的列基数和操作信息选择合适的索引类型,通过 sdm 监听 hms 的 partition 事件就可以在分区更新时异步完成信息采样。
  • 小文件合并:对有小文件较多的分区和表异步进行小文件合并,减少 namenode 的压力
  • Hive 表索引:通过分析血缘信息得到热表热列上的高频操作(点查,范围查询),基于此在分区文件层面异步的建立索引来加速查询。
  • 血缘解析:解析语句,分析字段血缘,吐出 UDF 血缘、算子(order by / sort by / group by…)影响关系等

对数据进行重组织时会涉及到对数据的读写,为了防止对生产作业的影响我们在进行操作时会修改相关表的 Table Properties 增加锁表标记,各个计算引擎适配实现了类 Hive 的锁管理机制,由 Hive metastore 统一作为 lock manager,在对表和分区并发操作场景下,做到对用户完全透明。

4. Hive Meta Store 上的优化

B站使用 HMS(Hive MetaStore)管理所有的离线表元信息,整个的离线计算的可用性都依赖 HMS 的稳定性。业务方在使用分区表时存在不少4级及以上分区的情况,有多个表分区数超百万。分区元信息庞大单次分区获取代价高,原生 HMS 基于单个 MySQL 实例存在性能瓶颈。

4.1 MetaStore Federation

随着多机房业务的推进,独立业务的 HDFS 数据和计算资源已经迁移到新机房,但是 HIVE 元数据仍在原有机房的 Mysql 中,这时候如果发生机房间的网络分区,就会影响新机房的任务。

为了解决上述问题,我们进行了方案调研,有两种方案供我们选择:

  • WaggleDance
  • HMS Federation

4.1.1 WaggleDance

WaggleDance是开源的一个项目(https://github.com/ExpediaGroup/waggle-dance),该项目主要是联合多个 HMS 的数据查询服务,实现了一个统一的路由接口解决多套 HMS 环境间的元数据共享问题。并且 WaggleDance 支持 HMS Client的接口调用。主要是通过 DB,把请求路由到对应的 HMS。

4.1.2 HMS Federation

HMS Federation 是解决多机房场景下的 HIVE 元数据存储问题,HIVE 元数据和 HDFS 数据存储在同一个机房,并且允许跨机房访问 HIVE 元数据。比如主站业务的 HDFS 数据存放在 IDC1,那么主站业务 HDFS 数据对应的 HIVE 元数据就存在IDC1 的 Mysql,同样直播业务的 HDFS 数据和 HIVE 元数据都存放在 IDC2。

同时 HMS Federation 也提供了 Mysql 的横向扩容能力,允许一个机房可以有多个 Mysql 来存放 HIVE 元数据,如果单个 Mysql 的压力过大,可以把单个 Mysql 的数据存放到多个 Mysql 里面,分担 Mysql 的压力。比如主站业务的 HIVE 库,zhu_zhan 和 zhu_zhan_tmp,可以分别放在 idc1-mysql1 和 idc1-mysql2。

我们在 HMS Federation 中加入了一个 StateStore 的角色,该角色可以理解为一个路由器,HMS 在查询 Hive 库/表/分区之前,先问 StateStore 所要访问的 HIVE 元信息存放在哪一个 Mysql 中,获取到了对应的 Mysql 后,构建相应的ObjectStore,进行 SQL 拼接或者是利用 JDO 查询后端 Mysql。

4.1.3 HMS Federation 与 WaggleDance 的对比

数据迁移

我们的主要目的是实现 HIVE 元数据按业务划分到各自 IDC 的 Mysql

  • WaggleDance 并没有提供相应元数据迁移工具,要迁移需要停止整个 HIVE 库新建表/分区,才能够开始迁移过去,对业务影响较大。
  • HMS Federation 可以按表的粒度迁移,对业务影响较小,并且可以指定某个 HIVE 库下,新建表在新的 Mysql,旧的等待着锁表迁移。

运维复杂度

  • WaggleDance 方案需要不同的 HMS,配置不同的 Mysql 地址,增加了 HMS 配置的复杂度。WaggleDance 是一个独立的服务,为了保证可用性,运维复杂度会再一次提升。
  • HMS Fedration 是 HMS 的功能升级,在 HMS 代码上开发,并且使用统一的配置。

综合上述对比,我们最终选择了 HMS Federation 的方案。通过修改 HMS 的代码,实现元数据跨 Mysql 存储。

4.2 MetaStore 请求追踪和流量控制

HMS 在处理 getPartitions 相关请求的时候,如果拉取的分区数量非常多,会给 HMS 的堆内存,以及后端的 Mysql 带来很大的压力,导致 HMS 服务响应延迟。

为了能够快速的定位到有问题的任务,我们在 Driver 中将 Job 相关的信息保存到 Hadoop CallerContext 中,在调用 HMS 接口的时候将 CallerContext 中的相关属性设置到 EnvironmentContext 中透传到 HMS 端,同时扩展了所有getPartitions 相关的接口支持传递 EnvironmentContext,EnvironmentContext 中的 properties 会在 HMS 的 audit log 中打印出来,方便问题任务的定位。

同时为了提高 HMS 服务的稳定性,我们在 HMS 端也做了接口的限流和主动关闭大查询。对于限流,我们新增了一个 TrafficControlListener,当接口被调用的时候会以 function 和 user 为单位记录 Counters 保存在该 Listener 中,同时在该Listener 中启动采集 used memory 和 counters 的线程,当平均使用内存达到阈值时,检查接口的QPS,如果qps达到阈值会让调用接口的线程 sleep 一段时间,下一次检查通过或者达到最大等待时间后放行。HMS 也有可能因为 getPartitions 方法返回的分区数量太大导致内存被打满,一方面我们限制了 getPartitions 从 mysql 返回的分区数量,超过一定数量就直接拒绝该请求,另一方面我们在 TProcessor 中以 threadId 和 socket 为 key 和 value 保存当前的连接,在检查 partition 数量时我们也按照 threadId 和 num partitions 为 key 和 value 保存 partition 的 cost,当 HMS 平均使用内存达到阈值超过一定时间后,会选择 num partitions 最大的 threadId,再根据 threadId 获取对应的连接,主动 close 该连接,来缓解内存压力。

5. 未来的一些工作

  • 调研不落地的 Remote Shuffle Service 来更好的适配 K8S 混部的场景
  • 使用向量化技术加速 Spark 的执行引擎,提升计算性能
  • 增强自动排错诊断系统,提升平台用户体验

我们会和业界同行和开源社区保持密切技术交流,在服务好内部用户作业的同时,也会积极反馈社区,共建社区生态。

from:https://mp.weixin.qq.com/s/2rYkFV5xVxJpIP4Qg4r7eg

hive 响应慢问题定位

情景描述:

大数据集群,目前有两套hiveserver2和metastore的集群,通过nginx指向进行流量互切,发现流量打到哪个metastore集群,哪个集群就特别卡顿。

那么先来回顾一下hive整个调用流程和框架

image-20201227123352163

Hive 提供的另外一个shell 客户端,也就是我们常用的hive 命令的客户端它的设计是直接启动了一个org.apache.hadoop.hive.cli.CliDriver的进程,这个进程其实主要包含了两块内容一个是提供给我们交互的cli ,另外一个就是我们的Driver 驱动引擎,这样的设计导致如果我们有多个客户端的情况下,我们就需要有多个Driver

image-20201226204741803

但是我们通过HiveServer2连接的时候我们就可以共享Driver,一方面可以简化客户端的设计降低资源损耗,另外一方面还能降低对MetaStore 的压力,减少连接的个数。

原因分析:

目前来看,变慢的原因应该是出现在hs2服务,metastore服务,具体业务,网络,服务器等原因。

1、先从简单的硬件分析入手,验证网络和服务器,这块省略验证过程

2、验证hs2服务,利用排除的方式,通过hive cli进行多次验证,发现也有缓慢的情况,正常应该1秒内返回,所以先不定位hs2的情况

3、验证metastore服务,还是先从直观简单的分析,看能否找出些现象,先看日志:

3.1 既然通过cli和hs2访问都会慢,先从简单的cli发起请求,可以通过加debug参数,发起查询

beeline –verbose=true –showNestedErrs=true –debug=true 看一下客户端是否有明显异常

3.2 cd /var/log/hive 查看一下是否有大量客户端访问

cat hadoop-cmf-hive-HIVEMETASTORE-data-hadoop-16-2.192.168.0.1.log.out | grep audit | grep -v “ugi=hue” | awk -F “ip=” ‘{print $2}’ | awk ‘{print $1}’ | sort | uniq -c | sort -nr | head

3.3 查看服务GC情况 jstat -gcutil pid interval(ms)

3.4 查看服务端日志

Error: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. (state=,code=10308)
org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:241)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:227)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:255)
at org.apache.hive.beeline.Commands.executeInternal(Commands.java:989)
at org.apache.hive.beeline.Commands.execute(Commands.java:1180)
at org.apache.hive.beeline.Commands.sql(Commands.java:1094)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1180)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:1013)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:922)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:518)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:501)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:226)
at org.apache.hadoop.util.RunJar.main(RunJar.java:141)
Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: [Error 10308]: Attempt to acquire compile lock timed out.
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:400)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:187)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:271)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:337)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:439)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:416)
at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:282)
at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:503)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这里显示有获取metastore连接超时的异常,关键的日志是compiling statement: [Error 10308]: Attempt to acquire compile lock timed out. 编译时候获取编译索失败

顺着这个思路,查看一下代码:

关键字:Completed compiling command(queryId,参考https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java

分析结论如下:

并且hivethrift每次访问都会初始化metastore 重新初始化元数据,HIVESEVER2提交SQL阻塞tryAcquireCompileLock原因:

a、HIVE1.1 只支持串行编译SQL,hiveserver2并发接受到SQL请求后,在complile阶段变为串行执行。当compilie编译慢时,引起阻塞SQL的提交。

b、compile的慢的原因:complile阶段,会通过hivemetastore访问mysql。目前是hiveserver2的请求打入到同一个metastore,流量上来后,hivemetastore访问mysql速度下降。

解决方案:hiveserver2的请求分摊所有hivemetastore上

基于doris+es 搭建用户画像系统(待整理)

olap
画像 doris+es
1、过去画像也是es 好用不代表能用好,
画像场景,看里面人群数,符合某些条件的人群数有多少?统计级
select count() groupby xxx 语义. 重要不优雅,用户需要学习dsl。包一层平台可以解决

人群如果有1000万(有将近1千多万条数据,大概占用空间有 300G 左右),几百万,导出来就比较崩溃,es 怎么用接口,并发导出,分布式读,并发控制,es黑科技,平台复杂度较高。普通用户对于高级玩法做不到。

几十万,几十分钟很正常,用游标scoll来做,如果失败,需要重跑,几千万,上亿就跑不出来了

doris on es 实现,列存,本地优先扫描,不排序(es 本身会排序,节省了很多计算开销),性能解决了,并且可以直接写sql了,不用写dsl了,扩展性,用户可以直接写sql了。中台必要的能力

更新场景,doris 更新不好,提升查询性能,需要做很多预计算,预计算需要有规则的。分析场景,维度跟指标要定义好,可以通过维度可以把指标预计算出来,画像场景,主键或者叫维度,更新大量是指标,查的时候要根据指标去查,某个年龄,金额大于xxx的,某个地域是xxx的人群是多少个,属性来查询。doris需要索引,排序,查询速度非常快,因为会预聚合,更新的结果应不应该在结果集中,性能会掉的很厉害,但是更新满足不了,
es 通过docid就可以更新了,性能一般,但是好歹是支持的。并且不影响读取。

所以画像场景,es把更新搞定,doris把上层查询搞定,查询快,有sql接口.

画像场景key 就是uid,其他都是属性一直在更新。预计算,就是终端类型android ios,地市,我想看北京市下的ios的用户数总共有多少人,报名人数,这种就是根据维度,预聚合求和,
kylin是所有维度预计算变成kv查询,这样会有维度爆炸,存储成本很高,加维度不方便,无法实时
,doris可以实时更新维度 tables schema change,上线没有kylin那么快,不会是毫秒级别的,做不到kylin那种kv查询,全部用预计算,比如要算几千万uv,每天都要groupby,如果现查询,肯定做不到秒出,

doris 要源源不断的更新的,并且可以计算的很好。也有物化视图,功能米有ck那么丰富,用不到太高级的能力,加一个rollup上卷就搞定了,一个物化视图的子集
clickhouse是一坨数据扔进去,查询会非常快

k1 k2 k3(城市) 维度列,pv uv 指标求最大,最小值,求k1 k3 算pv uv 可以k1 k3 pv uv做一个上卷,直接可以命中rollup列
clichouse 物化视图是跑sql,定期跑sql,doris做的是实时的,不是微批的,写的时候就把预计合全做了。

doris 预计算会把结果存在一个独立的表中,对外是可见的,sql会自动路由到rollup表里,自动分析sql的,可以指定维度和指标做预计算,业务是知道哪些是常用列,

es—doris+es 数据是不用动的,doris把es当成一个外表,挂上去就可以了,实时可用,外表创建成功了,fe模块会跟es元数据信息拉取es原信息,shard信息会拉取过来,doris访问失败,基本上都是因为es某个节点失败了,运维排查一下就好了

dorisdb数据更新,es就不用了,es维护成本太高了,需要写两套不同的实现,写入es keyword,doris必须是varchar int就会有问题,sql就解析失败了,doris有date类型,datetime有可能有不兼容问题,会查不出来。

es 类型不敏感,类型对应不上就会很麻烦。现在客户是通过平台化统一了,做了对应关系。

最终一致性,事务要求也不高,从业务库过来变成日志流了,客户对doris熟悉,人和事都认识,文档技术大家都知道,有问题会有人快速响应。

clickhouse运维复杂,c++,很难运维

目前所有分析都用doris,字段变更也很常见,现在有平台,改完字段,可以直接掉doris接口,alter table接口,不影响业务,无感知的

系统要能把控,技术做不深入 做proxy 做一些屏蔽 做平台化建设。在平台化解决掉

教学和拉新,1、统计类,分析
拉新,投放 端上PV uv多少人+业务的指标 来源,维度列可以枚举出来的。直接用doris

doris+es
1、老业务,裸用es 迁移doris 有成本。
2、微批架构 lamada kappy flink读业务流写到两个es表,ods–dwd,跟业务侧比较近,上面写sql 5分钟调度一次,当前时间往前推5分钟,a表更新3行,跟b表进行更新,如果join到了,说明b表也有更新了,

flinksql跟doris语法不一样,开发环境也不同,dwd dws ad 全部都是doris sql
select xxxx into xxx表,5分钟调度一次

flink实时,多留Join很麻烦

选择标准,微批数据不能太多,如果数据太多,还是走离线回灌回去

flink更新数据量很大,用redis做维表,doris不适合

对于客户要求要计算时间较长的数据,从doris再同步到hive里面去一份,这样就可以了,短时查询用doris,长周期用hive查,一个指标出口复用,但是不同需求,不同语法是合理的,目前在跟投放合作,还没有大规模用起来

腾讯没有调度系统

airflow 运维工作,对客户没有价值,体量?

clickhouse 可以通过shuddingkey 让多个表相同数据到同一个节点吗?

sqoop用法之mysql与hive数据导入导出

本文目录

一. Sqoop介绍
二. Mysql 数据导入到 Hive
三. Hive数据导入到Mysql
四. mysql数据增量导入hive

  • 1 基于递增列Append导入
    • 1). 创建hive
    • 2). 创建job
    • 3) 执行job
  • Lastmodified 导入实战
    • 1). 新建一张表
    • 2). 初始化hive表:

一. Sqoop介绍

Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导进到HadoopHDFS中,也可以将HDFS的数据导进到关系型数据库中。对于某些NoSQL数据库它也提供了连接器。Sqoop,类似于其他ETL工具,使用元数据模型来判断数据类型并在数据从数据源转移到Hadoop时确保类型安全的数据处理。Sqoop专为大数据批量传输设计,能够分割数据集并创建Hadoop任务来处理每个区块。

本文版本说明

hadoop版本 : hadoop-2.7.2
hive版本 : hive-2.1.0
sqoop版本:sqoop-1.4.6

二. Mysql 数据导入到 Hive

1). 将mysqlpeople_access_log表导入到hiveweb.people_access_log,并且hive中的表不存在。
mysql中表people_access_log数据为:

1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com'
2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com'
3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com'
4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com'
5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com'
6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com'

mysql数据导入hive的命令为:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
-m 1 \
--hive-import \
--create-hive-table \
--fields-terminated-by '\t' \
--hive-table web.people_access_log

该命令会启用一个mapreduce任务,将mysql数据导入到hive表,并且指定了hive表的分隔符为\t,如果不指定则为默认分隔符^A(ctrl+A)

参数说明

参数说明
--connectmysql的连接信息
--usernamemysql的用户名
--passwordmysql的密码
--table被导入的mysql源表名
-m并行导入启用的map任务数量,与--num-mapper含义一样
--hive-import插入数据到hive当中,使用hive默认的分隔符,可以使用--fields-terminated-by参数来指定分隔符。
-- hive-tablehive当中的表名

2). 也可以通过--query条件查询Mysql数据,将查询结果导入到Hive

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \
--target-dir /user/hive/warehouse/web/people_access_log \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1
参数说明
--query后接查询语句,条件查询需要\$CONDITIONS and连接查询条件,这里的\$表示转义$,必须有.
--delete-target-dir如果目标hive表目录存在,则删除,相当于overwrite.

三. Hive数据导入到Mysql

还是使用上面的hiveweb.people_access_log,将其导入到mysql中的people_access_log_out表中.

sqoop export \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log_out \
--input-fields-terminated-by '\t' \
--export-dir /user/hive/warehouse/web.db/people_access_log \
--num-mappers 1

注意:mysqlpeople_access_log_out需要提前建好,否则报错:ErrorException: Table 'test.people_access_log_out' doesn't exist。如果有id自增列,hive表也需要有,hive表与mysql表字段必须完全相同。

create table people_access_log_out like people_access_log;

执行完一个mr任务后,成功导入到mysqlpeople_access_log_out中.

四. mysql数据增量导入hive

实际中mysql数据会不断增加,这时候需要用sqoop将数据增量导入hive,然后进行海量数据分析统计。增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)。有几个核心参数:

  • –check-column:用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似.注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不可以的,同时–check-column可以去指定多个列
  • –incremental:用来指定增量导入的模式,两种模式分别为AppendLastmodified
  • –last-value:指定上一次导入中检查列指定字段最大值

1. 基于递增列Append导入

接着前面的日志表,里面每行有一个唯一标识自增列ID,在关系型数据库中以主键形式存在。之前已经将id在0~6之间的编号的订单导入到Hadoop中了(这里为HDFS),现在一段时间后我们需要将近期产生的新的订 单数据导入Hadoop中(这里为HDFS),以供后续数仓进行分析。此时我们只需要指定–incremental 参数为append–last-value参数为6即可。表示只从id大于6后即7开始导入。

1). 创建hive

首先我们需要创建一张与mysql结构相同的hive表,假设指定字段分隔符为\t,后面导入数据时候分隔符也需要保持一致。

2). 创建job

增量导入肯定是多次进行的,可能每隔一个小时、一天等,所以需要创建计划任务,然后定时执行即可。我们都知道hive的数据是存在hdfs上面的,我们创建sqoop job的时候需要指定hive的数据表对应的hdfs目录,然后定时执行这个job即可。

当前mysql中数据,hive中数据与mysql一样也有6条:

iduser_idaccess_timeipurl
1151101010101577003281739112.168.1.2https://www.baidu.com
2151101010111577003281749112.16.1.23https://www.baidu.com
3151101010121577003281759193.168.1.2https://www.taobao.com
4151101010131577003281769112.18.1.2https://www.baidu.com
5151101010141577003281779112.168.10.2https://www.baidu.com
615110101015157700328178911.168.1.2https://www.taobao.com

增量导入有几个参数,保证下次同步的时候可以接着上次继续同步.

sqoop job --create mysql2hive_job -- import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log \
--target-dir /user/hive/warehouse/web.db/people_access_log \
--check-column id \
--incremental append \
--fields-terminated-by '\t' \
--last-value 6 \
-m 1

这里通过sqoop job --create job_name命令创建了一个名为mysql2hive_jobsqoop job

3). 执行job

创建好了job,后面只需要定时周期执行这个提前定义好的job即可。我们先往mysql里面插入2条数据。

INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES
(7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'),
(8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com');

这样mysql里面就会多了2条数据。此时hive里面只有id1 ~ 6的数据,执行同步job使用以下命令。

sqoop job -exec mysql2hive_job

执行完成后,发现刚才mysql新加入的id7 ~ 8的两条数据已经同步到hive

hive> select * from web.people_access_log;
OK
1   15110101010 1577003281739   112.168.1.2 https://www.baidu.com
2   15110101011 1577003281749   112.16.1.23 https://www.baidu.com
3   15110101012 1577003281759   193.168.1.2 https://www.taobao.com
4   15110101013 1577003281769   112.18.1.2  https://www.baidu.com
5   15110101014 1577003281779   112.168.10.2    https://www.baidu.com
6   15110101015 1577003281789   11.168.1.2  https://www.taobao.com
7   15110101016 1577003281790   112.168.1.3 https://www.qq.com
8   15110101017 1577003281791   112.1.1.3   https://www.microsoft.com

由于实际场景中,mysql表中的数据,比如订单表等,通常是一致有数据进入的,这时候只需要将sqoop job -exec mysql2hive_job这个命令定时(比如说10分钟频率)执行一次,就能将数据10分钟同步一次到hive数据仓库。

2. Lastmodified 导入实战

append适合业务系统库,一般业务系统表会通过自增ID作为主键标识唯一性。Lastmodified适合ETL的数据根据时间戳字段导入,表示只导入比这个时间戳大,即比这个时间晚的数据。

1). 新建一张表

mysql中新建一张表people_access_log2,并且初始化几条数据:

CREATE TABLE `people_access_log2` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
  `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ip` varchar(15) NOT NULL COMMENT '访客ip',
  `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入数据:

insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com');
insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com');
insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com');
insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com');

mysql里面的数据就是这样:

iduser_idaccess_timeipurl
1151101010102019-12-28 16:23:10112.168.1.200https://www.baidu.com
2151101010112019-12-28 16:23:33112.16.1.2https://www.baidu.com
3151101010122019-12-28 16:23:41112.168.1.2https://www.taobao.com
4151101010132019-12-28 16:23:46112.168.10.2https://www.baidu.com
5151101010142019-12-28 16:23:52112.168.1.2https://www.jd.com
6151101010152019-12-28 16:23:56112.168.12.4https://www.qq.

2). 初始化hive表:

初始化hive数据,将mysql里面的6条数据导入hive中,并且可以自动帮助我们创建对应hive表,何乐而不为,否则我们需要自己手动创建,完成初始化工作。

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--create-hive-table \
--fields-terminated-by ',' \
--hive-table web.people_access_log2

可以看到执行该命令后,启动了二一个mapreduce任务,这样6条数据就进入hiveweb.people_access_log2了:

hive> select * from web.people_access_log2;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
Time taken: 0.326 seconds, Fetched: 6 row(s)

3). 增量导入数据:

我们再次插入一条数据进入mysqlpeople_access_log2表:

insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com');

此时,mysql表里面已经有7条数据了,我们使用incremental的方式进行增量的导入到hive:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table people_access_log2 \
-m 1 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \

2019-12-28 16:23:56就是第6条数据的时间,这里需要指定。报错了:

19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.

注意:可以看到--merge-key or --append is required when using --incremental lastmodified意思是,这种基于时间导入模式,需要指定--merge-key或者--append参数,表示根据时间戳导入,数据是直接在末尾追加(append)还是合并(merge),这里使用merge方式,根据id合并:

sqoop import \
--connect jdbc:mysql://master1.hadoop:3306/test \
--username root \
--password 123456 \
--table people_access_log2 \
--hive-import \
--hive-table web.people_access_log2 \
--check-column access_time \
--incremental lastmodified \
--last-value "2019-12-28 16:23:56" \
--fields-terminated-by ',' \
--merge-key id

执行该命令后,与直接导入不同,该命令启动了2个mapreduce任务,这样就把数据增量merge导入hive表了.

hive> select * from web.people_access_log2 order by id;
OK
1   15110101010 2019-12-28 16:23:10.0   112.168.1.200   https://www.baidu.com
2   15110101011 2019-12-28 16:23:33.0   112.16.1.2  https://www.baidu.com
3   15110101012 2019-12-28 16:23:41.0   112.168.1.2 https://www.taobao.com
4   15110101013 2019-12-28 16:23:46.0   112.168.10.2    https://www.baidu.com
5   15110101014 2019-12-28 16:23:52.0   112.168.1.2 https://www.jd.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
6   15110101015 2019-12-28 16:23:56.0   112.168.12.4    https://www.qq.com
7   15110101016 2019-12-28 16:28:24.0   112.168.12.45   https://www.qq.com
Time taken: 0.241 seconds, Fetched: 8 row(s)

可以看到id=6的数据,有2条,它的时间刚好是--last-value指定的时间,则会导入大于等于--last-value指定时间的数据,这点需要注意。

转载请注明:柯广的网络日志 » sqoop用法之mysql与hive数据导入导出

Spark提交任务流程

Spark 调用了 AmRmClient 的 API addContainerRequest

AmRmClient 在处理 addContainerRequest 时,会针对每个 ContainerRequest 生成一个ResourceRequest

但是 ResourceRequest 接下来是由为 ResourceRequestInfo 处理的,这个是缓存在 (priority, resourceName, executionType) 三元组为 key 的 map 中的,每次取到了都会重新 set labelExpression。尽管 labelExpression 这里被 random 处理了,但只保留了最后一次。

问:是否可以把 ResourceRequestInfo 的 map 多加一层 label的,这样就能保留每次不同的 labelExpression 了?

答:应该是am去申请的container的时候,标签是随机的,一半提交到了资源紧张的分区,被pending了,客户端因为conainer已经申请完了,不会新申请container,这里要看看yarn有没有重新申请conainer的能力,如果pengding超过一定时间

问:但是我理解每次 assignContainerToNodes 都是以 nm node 为单位的哈,1500台 nm 的集群都没有资源在pending,只分配到小集群的概率太小了,应该不会稳定地出现一边倒的情况。上面分析的结论是,多个 containerRequest 会被合并成一个,所以只带了一个标签。

答:现在现象是长尾任务containaienr都pengding在资源比较少的那些分区是吧。所有的containerrequest本来就是一批哈 咱们这做的就是APP的container分配到同一个区分。

问:短任务也会,长任务时间长了可以跑满到所有分区。猜测是后面几个 stage 对应的request成功分配到了默认分区里。这个可能咱们还得对一下,不然有些典型场景可能会有问题哈。比如如果默认分区几乎跑满了,刚刚弹性扩容出来的分区是空的,所有任务仍然只有一半概率跑到扩容出来的分区上,除非默认分区 100% pending,才有可能在重试的时候都跑到新分区上去。同理,如果倒换过来,一个超过了弹性分区可用资源规模的任务调度到了弹性分区上,也有可能导致一些问题。我理解这种模式可能适合大量的小任务,但是大任务有较小任务更大的概率会变得更慢。