MR产出数据体积变大问题

一、现象

前段时间在做数据同步任务的切换,在切换到MR后,总共36个任务,10个变小,16个不变,10个变大,而且变大的体积基本上都是翻倍。另外公司的存储空间当时较为紧张,所以需要解决这个体积增大的问题。

下图是用 parquet-tools 解析文件获得到,可以看到,左侧 outscan 比 右侧 mr 在好几个column上压缩率都高不少

ods_trd_buyer_order_app_order_desc_info_d

image-20210127165937262

ods_itm_wd_inv_app_item_sku_d 则更加明显

image-20210127165912092

二、研究

1、outscan(之前的同步方式,下面不再注释)和mr同步的区别。

outscan mr
直接 map hfile,没有reduce过程 分别map text 增量和 parquet 全量,再进行reduce
输出类为 AvroParquetOutputFormat 输出类为 ParquetOutputFormat

2、尝试调整 parquet 参数

先放一张 parquet 文件格式图

image-20210127165049558

可以看到 parquet 文件我们最需要关心的部分为 RowGroup、Column、 Page 。

当我们写 parquet 文件时,先会构造 RowGroup,然后根据不同的列构造不同的 Column,再分别在 Column 中构造 Page,然后向 Page 中写入数据,在每次写入时,会由

accountForValueWritten() 判断是否需要 writePage ,如果需要的话会把当前 Column 中的 数据输出到一个 Collect,snappy 压缩也是在这个过程。当整个RowGroup到达整个 parquet.block.size 大小时,会把整个 RowGroup 输出到 hdfs。

尝试调整的参数 调整到的值 对体积大小的影响
parquet.block.size 1G 基本无影响
parquet.page.size 1-128M 基本无影响
parquet.dictionary.page.size 1-128M 当只读 text 时,对体积约有20%的减小,在线上跑时,对体积基本无影响
io.compression.codec.snappy.buffersize 1M 基本无影响
io.file.buffer.size 1M 基本无影响
输出类改为 AvroParquet 基本无影响
升级 parquet 版本 基本无影响

3、查看 outscan 和 mr 的日志区别

a) outscan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to 134217728
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Dictionary is on
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Validation is off
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
Dec 11, 2020 4:08:22 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Maximum row group padding size is 0 bytes
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 134,279,407 > 134,217,728: flushing 1,035,273 records to disk.
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 134,097,175
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 4,122,796B for [auto_id] INT64: 1,035,273 values, 7,512,463B raw, 4,122,414B comp, 8 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 94,035 entries, 752,280B raw, 94,035B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,925,458B for [id] INT64: 1,035,273 values, 8,282,248B raw, 5,925,075B comp, 8 pages, encodings: [BIT_PACKED, PLAIN, RLE]
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 4,213,757B for [item_id] INT64: 1,035,273 values, 7,507,266B raw, 4,213,375B comp, 8 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 83,192 entries, 665,536B raw, 83,192B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,449,023B for [price] BINARY: 1,035,273 values, 1,548,410B raw, 1,448,622B comp, 9 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 13,877 entries, 135,544B raw, 13,877B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 305,529B for [quantity] INT64: 1,035,273 values, 601,051B raw, 305,153B comp, 8 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 1,455 entries, 11,640B raw, 1,455B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,585,004B for [total_price] BINARY: 1,035,273 values, 1,726,198B raw, 1,584,599B comp, 9 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 33,741 entries, 334,219B raw, 33,741B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 25,257B for [cps_fee] BINARY: 1,035,273 values, 50,010B raw, 24,968B comp, 8 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 314 entries, 2,627B raw, 314B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 2,715,269B for [item_sku_id] INT64: 1,035,273 values, 6,616,763B raw, 2,714,888B comp, 8 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 104,273 entries, 834,184B raw, 104,273B comp}
Dec 11, 2020 4:08:54 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 6,605,807B for [add_time] BINARY: 1,035,273 values, 22,843,179B raw, 6,604,199B comp, 23 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 27,693 entries, 636,939B raw, 27,693B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 4,098,234B for [order_id] INT64: 1,035,273 values, 7,490,983B raw, 4,097,852B comp, 8 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 76,367 entries, 610,936B raw, 76,367B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 402B for [sk] BINARY: 1,035,273 values, 94B raw, 102B comp, 4 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 7 entries, 392B raw, 7B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,835,688B for [item_sku_title] BINARY: 1,035,273 values, 12,055,000B raw, 5,834,969B comp, 13 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 23,553 entries, 567,428B raw, 23,553B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 34,929,921B for [item_title] BINARY: 1,035,273 values, 67,065,375B raw, 34,918,001B comp, 65 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 9,636 entries, 720,750B raw, 9,636B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 23,697,146B for [img_head] BINARY: 1,035,273 values, 62,188,305B raw, 23,689,804B comp, 61 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 11,114 entries, 692,307B raw, 11,114B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 15,024,817B for [extend] BINARY: 1,035,273 values, 59,829,153B raw, 15,022,915B comp, 56 pages, encodings: [BIT_PACKED, PLAIN, RLE]
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 456B for [data_ver] INT64: 1,035,273 values, 96B raw, 112B comp, 8 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 1 entries, 8B raw, 1B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 6,496,780B for [update_time] BINARY: 1,035,273 values, 22,843,365B raw, 6,495,172B comp, 23 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 27,530 entries, 633,190B raw, 27,530B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 486,705B for [buyer_id] BINARY: 1,035,273 values, 1,757,571B raw, 486,033B comp, 14 pages, encodings: [BIT_PACKED, PLAIN, RLE, PLAIN_DICTIONARY], dic { 72,340 entries, 980,153B raw, 72,340B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,248,275B for [seller_id] INT32: 1,035,273 values, 1,440,252B raw, 1,248,119B comp, 4 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 112,471 entries, 449,884B raw, 112,471B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 164,710B for [f_seller_id] INT32: 1,035,273 values, 251,370B raw, 164,554B comp, 4 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 12,392 entries, 49,568B raw, 12,392B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 149,608B for [status] INT32: 1,035,273 values, 158,031B raw, 149,452B comp, 4 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 3 entries, 12B raw, 3B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 456B for [rate_fund] INT64: 1,035,273 values, 96B raw, 112B comp, 8 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 1 entries, 8B raw, 1B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 169,390B for [version_id] INT32: 1,035,273 values, 231,460B raw, 169,234B comp, 4 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 13 entries, 52B raw, 13B comp}
Dec 11, 2020 4:08:55 PM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 67,464B for [refund_status] INT32: 1,035,273 values, 89,951B raw, 67,308B comp, 4 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 4 entries, 16B raw, 4B comp}
b) mr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to 134217728
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Dictionary is on
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Validation is off
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
Dec 11, 2020 12:09:45 AM INFO: org.apache.parquet.hadoop.ParquetOutputFormat: Maximum row group padding size is 0 bytes
Dec 11, 2020 12:10:01 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 134,596,338 > 134,217,728: flushing 679,290 records to disk.
Dec 11, 2020 12:10:01 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 135,665,494
Dec 11, 2020 12:10:01 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 84,603B for [auto_id] INT64: 679,290 values, 84,384B raw, 84,323B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 22,331 entries, 178,648B raw, 22,331B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,121,794B for [id] INT64: 679,290 values, 5,434,368B raw, 5,121,507B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN]
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 4,114,386B for [item_id] INT64: 679,290 values, 5,434,368B raw, 4,114,099B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN]
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,140,171B for [price] BINARY: 679,290 values, 1,158,473B raw, 1,139,909B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 12,680 entries, 123,547B raw, 12,680B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 281,422B for [quantity] INT64: 679,290 values, 643,817B raw, 281,140B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 1,213 entries, 9,704B raw, 1,213B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,225,975B for [total_price] BINARY: 679,290 values, 1,243,295B raw, 1,225,712B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 27,429 entries, 269,122B raw, 27,429B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,977B for [cps_fee] BINARY: 679,290 values, 2,390B raw, 1,760B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 165 entries, 1,357B raw, 165B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 2,572,263B for [item_sku_id] INT64: 679,290 values, 4,662,155B raw, 2,571,977B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN, PLAIN_DICTIONARY], dic { 70,731 entries, 565,848B raw, 70,731B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 6,933,270B for [add_time] BINARY: 679,290 values, 15,623,790B raw, 6,932,221B comp, 15 pages, encodings: [BIT_PACKED, RLE, PLAIN]
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,037,909B for [order_id] INT64: 679,290 values, 5,434,368B raw, 5,037,622B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN]
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 332B for [sk] BINARY: 679,290 values, 117B raw, 123B comp, 3 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 9 entries, 548B raw, 9B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,675,428B for [item_sku_title] BINARY: 679,290 values, 7,868,590B raw, 5,674,984B comp, 9 pages, encodings: [BIT_PACKED, RLE, PLAIN, PLAIN_DICTIONARY], dic { 30,819 entries, 729,235B raw, 30,819B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 34,917,594B for [item_title] BINARY: 679,290 values, 41,284,243B raw, 34,910,519B comp, 41 pages, encodings: [BIT_PACKED, RLE, PLAIN, PLAIN_DICTIONARY], dic { 15,403 entries, 1,011,183B raw, 15,403B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 27,409,350B for [img_head] BINARY: 679,290 values, 41,769,032B raw, 27,405,580B comp, 41 pages, encodings: [BIT_PACKED, RLE, PLAIN, PLAIN_DICTIONARY], dic { 15,612 entries, 1,014,148B raw, 15,612B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 16,932,320B for [extend] BINARY: 679,290 values, 42,638,909B raw, 16,930,559B comp, 42 pages, encodings: [BIT_PACKED, RLE, PLAIN, PLAIN_DICTIONARY], dic { 17,372 entries, 1,008,909B raw, 17,372B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 342B for [data_ver] INT64: 679,290 values, 72B raw, 84B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 1 entries, 8B raw, 1B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 6,800,709B for [update_time] BINARY: 679,290 values, 15,623,790B raw, 6,799,660B comp, 15 pages, encodings: [BIT_PACKED, RLE, PLAIN]
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 6,194,701B for [buyer_id] BINARY: 679,290 values, 9,200,418B raw, 6,194,344B comp, 9 pages, encodings: [BIT_PACKED, RLE, PLAIN]
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,464,540B for [seller_id] INT32: 679,290 values, 1,464,273B raw, 1,464,423B comp, 3 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 131,083 entries, 524,332B raw, 131,083B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 295,854B for [f_seller_id] INT32: 679,290 values, 550,175B raw, 295,737B comp, 3 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 16,979 entries, 67,916B raw, 16,979B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 152,287B for [status] INT32: 679,290 values, 161,460B raw, 152,170B comp, 3 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 3 entries, 12B raw, 3B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 342B for [rate_fund] INT64: 679,290 values, 72B raw, 84B comp, 6 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 1 entries, 8B raw, 1B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 226,851B for [version_id] INT32: 679,290 values, 309,556B raw, 226,734B comp, 3 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 16 entries, 64B raw, 16B comp}
Dec 11, 2020 12:10:02 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 65,837B for [refund_status] INT32: 679,290 values, 101,856B raw, 65,720B comp, 3 pages, encodings: [BIT_PACKED, RLE, PLAIN_DICTIONARY], dic { 4 entries, 16B raw, 4B comp}

可以看到 parquet.block.size 、page.size 、 dictionary.page.size 这些参数都一样的情况下,在一个 rowgroup 中 mr 能写入的数据少于 outscan 能写入的数据,这就是因为压缩率不同导致的。

3、查看源码

a) snappy
1
org.apache.parquet.hadoop.CodecFactory.BytesCompressor#compress  public BytesInput compress(BytesInput bytes) throws IOException {  final BytesInput compressedBytes;  if (codec == null) {    compressedBytes = bytes;  } else {    compressedOutBuffer.reset();    if (compressor != null) {      // null compressor for non-native gzip      compressor.reset();    }    CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);    bytes.writeAllTo(cos);    cos.finish();    cos.close();    compressedBytes = BytesInput.from(compressedOutBuffer);  }  return compressedBytes; }  org.apache.parquet.hadoop.codec.SnappyCompressor#compress  @Override public synchronized int compress(byte[] buffer, int off, int len) throws IOException {  SnappyUtil.validateBuffer(buffer, off, len);   if (needsInput()) {    // No buffered output bytes and no input to consume, need more input    return 0;  }   if (!outputBuffer.hasRemaining()) {    // There is uncompressed input, compress it now    int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());    if (maxOutputSize > outputBuffer.capacity()) {      outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);    }    // Reset the previous outputBuffer    outputBuffer.clear();    inputBuffer.limit(inputBuffer.position());    inputBuffer.position(0);     int size = Snappy.compress(inputBuffer, outputBuffer);    outputBuffer.limit(size);    inputBuffer.limit(0);    inputBuffer.rewind();  }   // Return compressed output up to 'len'  int numBytes = Math.min(len, outputBuffer.remaining());  outputBuffer.get(buffer, off, numBytes);      bytesWritten += numBytes;  return numBytes;      }  org.xerial.snappy.Snappy#compress(java.nio.ByteBuffer, java.nio.ByteBuffer)  public static int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {     if (!uncompressed.isDirect())        throw new SnappyError(SnappyErrorCode.NOT_A_DIRECT_BUFFER, "input is not a direct buffer");    if (!compressed.isDirect())        throw new SnappyError(SnappyErrorCode.NOT_A_DIRECT_BUFFER, "destination is not a direct buffer");     // input: uncompressed[pos(), limit())    // output: compressed    int uPos = uncompressed.position();    int uLen = uncompressed.remaining();    int compressedSize = ((SnappyNativeAPI) impl).rawCompress(uncompressed, uPos, uLen, compressed,            compressed.position());     //         pos  limit    // [ ......BBBBBBB.........]    compressed.limit(compressed.position() + compressedSize);     return compressedSize; }

上面是 snappy 压缩的过程,可以看到 snappy 对输入的流除了不能超过 Integer.MAXVALUE,其他没有做什么限制。也就是输入什么,它就按它的逻辑去压缩什么。parquet 获取到压缩后的流写入内存 page 中。

因此推测问题应该不在 snappy 这边。

b) parquet

parquet 源码我就不复制过来了,有点多。 parquet 也只是获取到 reduce 的输出流,对数据根据类型和内容进行不同的编码,然后形成它的数据结构再用 snappy 进行压缩,再输出到文件。

需要注意的是 parquet.dictionary.page.size 这个参数会对编码造成一定影响,当 dictionary 里的值大于配置的值,编码会退化。

c) mapreduce

重点怀疑是在 reduce 后或者过程中,输出数据之间较为稀疏,导致压缩效率不理想。但是这块不是非常熟悉,需要找方法验证。

三、结果

已经找到原因,确实是 reduce 后数据较为稀疏导致压缩率不高。

前天在排查的时候,发现 outscan 出来的数据某些 ID 都是连续的,而后面的 title 数据会有基本相同的数据排在一起。怀疑某些表的数据有特殊性,在 ID 连续时,其他列的数据基本相同。后续也证实了这个猜想。

当某些表的 xxx_id 连续分布在同一个 reduce 中时,会对压缩率有极大的提高,例如

| item_sku_id| title |
| 11231 | 黑色羊毛衫XL|
| 11232 | 黑色羊毛衫XXL|

| 11233 | 遥控汽车A |

| 11234 | 遥控汽车B |

默认的 partition 会将 11231,11233发送到 reduce A , 11232 ,11234 发送到 reduce B,最后产生两个内容基本没有重复的文件,会导致数据不连续 压缩率不理想。

而 outscan 没有 reduce 的过程,直接 map 完有序输出,不会将连续的 id 切分到各个 reduce。

解决办法

1
public int getPartition(Text text, Text value, int numPartitions) {    long key = Long.parseLong((text.toString().split("_"))[0]);    return (int) (key/100%numPartitions); }

增加了一个 partition,专门给这些数据上有连续性的表用,将连续的 ID 发送到同一个 reduce 输出。sub最后两位是观察我们DB里的数据的连续性得出的经验值。

四、解决问题之外的研究

最后研究下 parquet 的编码对 parquet 本身压缩率有多大的影响。

开启 SNAPPY 压缩,自定义分区方式重跑 ods_itm_wd_inv_app_item_sku_d 任务结果体积大小及编码结果

image-20210127165125612

image-20210127165142007

关闭SNAPPY压缩后,自定义分区方式重新跑 ods_itm_wd_inv_app_item_sku_d 任务结果体积大小及编码结果。

image-20210127165848742

image-20210127165221964

关闭SNAPPY压缩后,默认分区方式重新跑 ods_itm_wd_inv_app_item_sku_d 任务结果体积大小及编码结果。

image-20210127165830347

image-20210127165246366

根据上面的数据可以得出,在数据密集的情况下,parquet 本身对数据的压缩率就会高一些,在数据不密集的情况下 5.1T ,数据密集的情况下 3.7T。

下面两张图可以看到,有序和无序对都是字符的 title 列的编码方式影响不大,都用PLAIN编码。

image-20210127165814330

image-20210127165310098