- Release Notes
- Get Started
- Clusters
- Cloud Settings
- Table Type
- Query Data Lakes
- Integration
- Query Acceleration
- Data Loading
- Concepts
- Batch load data from Amazon S3
- Batch load data from Azure cloud storage
- Load data from a local file system
- Load data from Confluent Cloud
- Load data from Amazon MSK
- Load data from Amazon Kinesis
- Data Unloading
- Data Backup
- Security
- Console Access Control
- Data Access Control
- Application keys
- Service accounts
- Use SSL connection
- Alarm
- Usage and Billing
- Organizations and Accounts
- Reference
- Amazon Web Services (AWS)
- Microsoft Azure
- SQL Reference
- Keywords
- ALL statements
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- SHOW FILE
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- SELECT
- SHOW ALTER
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- SQL Functions
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayofweek_iso
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- jodatime_format
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str_to_jodatime
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- to_iso8601
- to_tera_date
- to_tera_timestamp
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- count_if
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Geographic Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- substring_index
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Bit Functions
- Bitmap Functions
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Map Functions
- Binary Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- AUTO_INCREMENT
- Generated columns
- System variables
- System limits
- Information Schema
- Overview
- be_bvars
- be_cloud_native_compactions
- be_compactions
- character_sets
- collations
- column_privileges
- columns
- engines
- events
- global_variables
- key_column_usage
- load_tracking_logs
- loads
- materialized_views
- partitions
- pipe_files
- pipes
- referential_constraints
- routines
- schema_privileges
- schemata
- session_variables
- statistics
- table_constraints
- table_privileges
- tables
- tables_config
- task_runs
- tasks
- triggers
- user_privileges
- views
- System Metadatabase
- API
- Overview
- Actions
- Clusters
- Create and Manage Clusters
- Query Clusters
- Identity and Access Management
- Organization and Account
- Usage and Billing
- Clusters
- Terraform Provider
- Run scripts
Use HLL for approximate count distinct
Background
In a real-world scenario, the pressure to de-duplicate the data increases as the volume of data increases. When the size of data reaches a certain level, the cost of accurate de-duplication is relatively high. In this case, users usually use approximate algorithms to reduce the computational pressure. HyperLogLog (HLL), which will be introduced in this section, is an approximate de-duplication algorithm that has excellent space complexity O(mloglogn) and time complexity O(n). In addition, the error rate of the computation result can be controlled to about 1%-10%, depending on the size of the data set and the hash function used.
What is HyperLogLog
HyperLogLog is an approximate de-duplication algorithm that consumes very little storage space. The HLL type is used for implementing the HyperLogLog algorithm. It holds the intermediate results of the HyperLogLog calculation and can only be used as an indicator column type for data tables.
Since the HLL algorithm involves a lot of mathematical knowledge, we will use a practical example to illustrate it. Suppose we design a randomized experiment A, that is to do independent repetitions of coin flips until the first head; record the number of coin flips for the first head as a random variable X, then:
- X=1, P(X=1)=1/2
- X=2, P(X=2)=1/4
- ...
- X=n, P(X=n)=(1/2)n
We use test A to construct randomized test B which is to do N independent repetitions of test A, generating N independent identically distributed random variables X1, X2, X3, ..., XN.Take the maximum value of the random variables as Xmax. Leveraging the great likelihood estimation, the estimated value of N is 2Xmax.
Now, we simulate the above experiment using the hash function on the given dataset:
- Test A: Calculate the hash value of dataset elements and convert the hash value to binary representation. Record the occurrence of bit=1, starting from the lowest bit of the binary.
- Test B: Repeat the Test A process for dataset elements of Test B. Update the maximum position "m" of the first bit 1 occurrence for each test;
- Estimate the number of non-repeating elements in the dataset as m2.
In fact, the HLL algorithm divides the elements into K=2k buckets based on the lower k bits of the element hash. Count the maximum value of the first bit 1 occurrence from the k+1st bit as m1, m2,..., mk, and estimate the number of non-repeating elements in the bucket as 2m1, 2m2,..., 2mk. The number of non-repeating elements in the data set is the summed average of the number of buckets multiplied by the number of non-repeating elements in the buckets: N = K(K/(2-m1+2-m2,..., 2-mK)).
HLL multiplies the correction factor with the estimation result to make the result more accurate.
Refer to the article https://gist.github.com/avibryant/8275649 on implementing HLL de-duplication algorithm with CelerData SQL statements:
SELECT floor((0.721 * 1024 * 1024) / (sum(pow(2, m * -1)) + 1024 - count(*))) AS estimate
FROM(select(murmur_hash3_32(c2) & 1023) AS bucket,
max((31 - CAST(log2(murmur_hash3_32(c2) & 2147483647) AS INT))) AS m
FROM db0.table0
GROUP BY bucket) bucket_values
This algorithm de-duplicates col2 of db0.table0.
- Use the hash function
murmur_hash3_32
to calculate the hash value of col2 as a 32-signed integer. - Use 1024 buckets, the correction factor is 0.721, and take the lower 10 bits of the hash value as the subscript of the bucket.
- Ignore the sign bit of the hash value, start from the next highest bit to the lower bit, and determine the position of the first bit 1 occurrence.
- Group the calculated hash values by bucket, and use the
MAX
aggregation to find the maximum position of the first bit 1 occurrence in the bucket. - The aggregation result is used as a subquery, and the summed average of all bucket estimates is multiplied by the number of buckets and the correction factor.
- Note that the empty bucket count is 1.
The above algorithm has a very low error rate when the data volume is large.
This is the core idea of the HLL algorithm. Please refer to the HyperLogLog paper if you are interested.
How to use HyperLogLog
- To use HyperLogLog de-duplication, you need to set the target indicator column type to ‘HLL’ and the aggregation function to
HLL_UNION
in the table creation statement. - Currently, only the aggregation table supports HLL as indicator column type.
- When using
count distinct
on columns of the HLL type, CelerData will automatically convert it to theHLL_UNION_AGG
calculation.
Examples
First, create a table with HLL columns, where uv is an aggregated column, the column type is HLL
, and the aggregation function is HLL_UNION
CREATE TABLE test(
dt DATE,
id INT,
uv HLL HLL_UNION
)
DISTRIBUTED BY HASH(ID) BUCKETS 32;
- Note: When the data volume is large, it is better to create a corresponding rollup table for high frequency HLL queries
Importing data in Stream Load mode:
curl --location-trusted -u username:password -H "label:label_1600997542287" \
-H "column_separator:," \
-H "columns:dt,id,user_id, uv=hll_hash(user_id)"
-T test.csv
http://celerdata_be0:8040/api/db0/test/_stream_load
{
"TxnId": 2504748,
"Label": "label_1600997542287",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 120,
"LoadTimeMs": 46,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 29,
"CommitAndPublishTimeMs": 14
}
Load data by using BROKER LOAD:
LOAD LABEL test_db.label
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
INTO TABLE `test`
COLUMNS TERMINATED BY ","
(dt, id, user_id)
SET (
uv = HLL_HASH(user_id)
)
);
Querying data
- The HLL column does not allow direct query of its original value, use the function
HLL_UNION_AGG
to query - To find the total uv
SELECT HLL_UNION_AGG(uv) FROM test;
This statement is equivalent to
SELECT COUNT(DISTINCT uv) FROM test;
- Query for uv of everyday
SELECT COUNT(DISTINCT uv) FROM test GROUP BY ID;
Cautions
How should I choose between Bitmap and HLL? If the base of the dataset is in the millions or tens of millions, and you have a few dozen machines, use count distinct
. If the base is in the hundreds of millions and needs to be accurately de-duplicated, useBitmap
; if approximate de-duplication is acceptable, use the HLL
type.
Bitmap only supports TINYINT, SMALLINT, INT, and BIGINT. Note that LARGEINT is not supported. For other types of data sets to be de-duplicated, a dictionary needs to be built to map the original type to an integer type. Building a dictionary is complex, and requires a trade-off between data volume, update frequency, query efficiency, storage, and other issues. HLL does not need a dictionary, but it needs the corresponding data type to support the hash function. Even in an analytical system that does not support HLL internally, it is still possible to use the hash function and SQL to implement HLL de-duplication.
For common columns, users can use the NDV function for approximate de-duplication. This function returns an approximate aggregation of COUNT(DISTINCT col) results, and the underlying implementation converts the data storage type to the HyperLogLog type for calculation. The NDV function consumes a lot of resources when calculating and is therefore not well suited for high concurrency scenarios.
If you wish to perform user behavior analysis, you may consider IntersectCount or custom UDAF.