MENU

GORM Map 动态写入 ClickHouse 字段

June 24, 2023 • Read: 1588 • Go,ClickHouse

在工作中遇到了一个 Map 写入 ClickHouse 时类型转换的问题,解决后记录如下。

GORM 动态写入 ClickHouse 字段

需求场景

业务上游会生成多个下面这种 JSON 字符串,以及这样的表结构。

{
    "name": "张三",
    "age": 18,
    "phone": 15300000000,
    "sex": 1,
    "ip": "127.0.0.1",
    "time": 1687007258
}

表结构是:

CREATE TABLE gorm.test
(
    `name` String,
    `age` UInt8
)
ENGINE = MergeTree
ORDER BY name
SETTINGS index_granularity = 8192;

在实现写入逻辑时,我们需要考虑这几个业务前提:

  1. 上游提供的 JSON 字段可能会增加
  2. 研发或运维或许会动态的调整数据库的字段

所以我们需要根据 JSON 中有的字段以及数据表中的字段进行动态的匹配写入,这就要求我们编码时不能使用 Struct 结构体来定义 GORM Model,需要将 JSON 变成 Map 然后使用 GORM 写入数据库。

暴露问题

db, _ := gorm.Open(clickhouse.Open("clickhouse://default:@192.168.0.10:19000/gorm"), &gorm.Config{})
    var (
        str  = "{\"name\":\"dbkuaizi\",\"age\":18,\"phone\":15300000000,\"sex\":1,\"ip\":\"127.0.0.1\",\"time\":1687007258}"
        Data map[string]any
    )

    json.Unmarshal([]byte(str), &Data)

    if err := db.Table("test").Select("name", "age").Create(Data).Error; err != nil {
        fmt.Println(err)
    }

运行程序输出了一个错误:

2023/06/24 18:33:27 E:/code/ginweb/main.go:18 clickhouse [AppendRow]: age clickhouse [AppendRow]: converting float64 to UInt8 is unsupported
[1.334ms] [rows:0] INSERT INTO `test` (`age`,`name`) VALUES (18.000000,'dbkuaizi')
clickhouse [AppendRow]: age clickhouse [AppendRow]: converting float64 to UInt8 is unsupported

出错的原因是 ClickHouse 数据库中 age 字段类型是 Uint8,而 Json 反序列化时映射成了 Float64 类型, 接着 GORM 在入库前会对数据的类型进行校验,如果与字段类型不一致,且不能无损转换,就会报错。
(Gorm ClickHouse 驱动实现依赖 ClickHouse官方提供的 go 驱动包,同样也会继承官方 Go 驱动的类型校验逻辑,具体可以参考 https://github.com/ClickHouse/clickhouse-go/blob/main/TYPES.md )

解决方案

这个出错的原因从官方的角度看是非常合理的,但我们数据是从 JSON 反序列化过来的,即使我们明知入库存储不存在范围的问题,也会因为数据类型的原因无法入库。

所以我们如果想实现使用 map[string]any 插入数据到 clickhouse ,就需要根据数据表中实际的字段类型对 map[string]any 进行一次转换。

获取字段类型

实现类型转换的第一步,我们需要首先知道目标表有哪些字段需要写入,以及这些字段的类型。

可以通过读取 clickhouse 的系统表获取字段信息:

SELECT `name`,`type` FROM `system`.columns WHERE `database` = 'gorm' AND `table` = 'test'

-- 结果
name|type  |
----+------+
name|String|
age |UInt8 |

然后我们使用 Gorm 来读取:


// 先定义一个结构体用来获取数据
type FieldType struct {
    FieldName string `gorm:"column:name"`
    FieldType string `gorm:"column:type"`
}

func main() {
    ...
    var DbTypes []FieldType
    
    db.Raw("SELECT `name`,`type` FROM `system`.columns WHERE `database` = ? AND `table` = ?",
        "gorm", "test").Find(&DbTypes)
    // [{FieldName:name FieldType:String} {FieldName:age FieldType:UInt8}]
    fmt.Printf("%+v", DbTypes)
}

数据类型转换

接着我们写一个类型映射的方法,根据 ClickHouse 的字段类型,对 Data 数据类型进行转换。在遍历的同时,记录动态写入的字段,这样就可以根据数据库的字段调整自动写入数据。

// 字段类型映射
func DataTypeMap(DbTypes *[]FieldType, Data *map[string]any) []string {
    // 用一个变量来接受 Select 动态插入字段
    var selectFields []string
    // 遍历 Clickhouse 字段
    for _, field := range *DbTypes {
        // 如果 map 中存在这个字段
        if val, state := (*Data)[field.FieldName]; state {
            // 就加入到 插入字段中
            selectFields = append(selectFields, field.FieldName)
            // 接着对数据进行类型转换
            switch field.FieldType {
            case "Int8":
                (*Data)[field.FieldName] = cast.ToInt8(val)
            case "Int16":
                (*Data)[field.FieldName] = cast.ToInt16(val)
            case "Int32":
                (*Data)[field.FieldName] = cast.ToInt32(val)
            case "Int64":
                (*Data)[field.FieldName] = cast.ToInt64(val)
            case "UInt8":
                (*Data)[field.FieldName] = cast.ToUint8(val)
            case "UInt16":
                (*Data)[field.FieldName] = cast.ToUint16(val)
            case "UInt32":
                (*Data)[field.FieldName] = cast.ToUint32(val)
            case "UInt64":
                (*Data)[field.FieldName] = cast.ToUint64(val)
            case "Decimal":
                (*Data)[field.FieldName] = decimal.NewFromFloat(cast.ToFloat64(val))
            case "String":
                (*Data)[field.FieldName] = cast.ToString(val)
            }

        }

    }
    // 将需要插入的字段进行返回
    return selectFields
}

完整代码示例

package main

import (
    "encoding/json"
    "fmt"
    "github.com/shopspring/decimal"
    "github.com/spf13/cast"
    "gorm.io/driver/clickhouse"
    "gorm.io/gorm"
)

type FieldType struct {
    FieldName string `gorm:"column:name"`
    FieldType string `gorm:"column:type"`
}

func main() {
    db, _ := gorm.Open(clickhouse.Open("clickhouse://default:@192.168.0.10:19000/gorm"), &gorm.Config{})
    var (
        str     = "{\"name\":\"dbkuaizi\",\"age\":18,\"phone\":15300000000,\"sex\":1,\"ip\":\"127.0.0.1\",\"time\":1687007258}"
        Data    map[string]any
        DbTypes []FieldType
    )

    // 获取字段名称和类型
    db.Raw("SELECT `name`,`type` FROM `system`.columns WHERE `database` = ? AND `table` = ?",
        "gorm", "test").Find(&DbTypes)
    // 拿到反序列化后的数据
    json.Unmarshal([]byte(str), &Data)
    // 进行类型转换,并拿到需要动态写入的字段
    selectFields := DataTypeMap(&DbTypes, &Data)
    if err := db.Table("test").Select(selectFields).Create(Data).Error; err != nil {
        fmt.Println(err)
    }

}

// 字段类型映射
func DataTypeMap(DbTypes *[]FieldType, Data *map[string]any) []string {
    // 用一个变量来接受 Select 动态插入字段
    var selectFields []string
    // 遍历 Clickhouse 字段
    for _, field := range *DbTypes {
        // 如果 map 中存在这个字段
        if val, state := (*Data)[field.FieldName]; state {
            // 就加入到 插入字段中
            selectFields = append(selectFields, field.FieldName)
            // 接着对数据进行类型转换(这里只是转换了一部分类型,做个演示)
            switch field.FieldType {
            case "Int8":
                (*Data)[field.FieldName] = cast.ToInt8(val)
            case "Int16":
                (*Data)[field.FieldName] = cast.ToInt16(val)
            case "Int32":
                (*Data)[field.FieldName] = cast.ToInt32(val)
            case "Int64":
                (*Data)[field.FieldName] = cast.ToInt64(val)
            case "UInt8":
                (*Data)[field.FieldName] = cast.ToUint8(val)
            case "UInt16":
                (*Data)[field.FieldName] = cast.ToUint16(val)
            case "UInt32":
                (*Data)[field.FieldName] = cast.ToUint32(val)
            case "UInt64":
                (*Data)[field.FieldName] = cast.ToUint64(val)
            case "Decimal":
                (*Data)[field.FieldName] = decimal.NewFromFloat(cast.ToFloat64(val))
            case "String":
                (*Data)[field.FieldName] = cast.ToString(val)
            }

        }

    }
    // 将需要插入的字段进行返回
    return selectFields
}

动态写入测试

随后修改数据库字段,增加一个 phone 的字段,在不需要调整代码的前提下,数据也可以正常入库。这样就实现了字段动态写入的需求。

name    |age|phone      |
--------+---+-----------+
dbkuaizi| 18|           |
dbkuaizi| 18|15300000000|

需要注意的是,demo 中没有并发场景,所以直接使用了 map 类型。如果实际使用中有并发场景,考虑到并发安全,应使用 sync.Map 类型。

Leave a Comment

已有 1 条评论
  1. 不明觉历哈。@(哈哈)