在工作中遇到了一个 Map 写入 ClickHouse 时类型转换的问题,解决后记录如下。
GORM 动态写入 ClickHouse 字段
需求场景
业务上游会生成多个下面这种 JSON 字符串,以及这样的表结构。
{
"name": "张三",
"age": 18,
"phone": 15300000000,
"sex": 1,
"ip": "127.0.0.1",
"time": 1687007258
}
表结构是:
CREATE TABLE gorm.test
(
`name` String,
`age` UInt8
)
ENGINE = MergeTree
ORDER BY name
SETTINGS index_granularity = 8192;
在实现写入逻辑时,我们需要考虑这几个业务前提:
- 上游提供的 JSON 字段可能会增加
- 研发或运维或许会动态的调整数据库的字段
所以我们需要根据 JSON 中有的字段以及数据表中的字段进行动态的匹配写入,这就要求我们编码时不能使用 Struct 结构体来定义 GORM Model,需要将 JSON 变成 Map 然后使用 GORM 写入数据库。
暴露问题
db, _ := gorm.Open(clickhouse.Open("clickhouse://default:@192.168.0.10:19000/gorm"), &gorm.Config{})
var (
str = "{\"name\":\"dbkuaizi\",\"age\":18,\"phone\":15300000000,\"sex\":1,\"ip\":\"127.0.0.1\",\"time\":1687007258}"
Data map[string]any
)
json.Unmarshal([]byte(str), &Data)
if err := db.Table("test").Select("name", "age").Create(Data).Error; err != nil {
fmt.Println(err)
}
运行程序输出了一个错误:
2023/06/24 18:33:27 E:/code/ginweb/main.go:18 clickhouse [AppendRow]: age clickhouse [AppendRow]: converting float64 to UInt8 is unsupported
[1.334ms] [rows:0] INSERT INTO `test` (`age`,`name`) VALUES (18.000000,'dbkuaizi')
clickhouse [AppendRow]: age clickhouse [AppendRow]: converting float64 to UInt8 is unsupported
出错的原因是 ClickHouse 数据库中 age 字段类型是 Uint8,而 Json 反序列化时映射成了 Float64 类型, 接着 GORM 在入库前会对数据的类型进行校验,如果与字段类型不一致,且不能无损转换,就会报错。
(Gorm ClickHouse 驱动实现依赖 ClickHouse官方提供的 go 驱动包,同样也会继承官方 Go 驱动的类型校验逻辑,具体可以参考 https://github.com/ClickHouse/clickhouse-go/blob/main/TYPES.md )
(Gorm ClickHouse 驱动实现依赖 ClickHouse官方提供的 go 驱动包,同样也会继承官方 Go 驱动的类型校验逻辑,具体可以参考 https://github.com/ClickHouse/clickhouse-go/blob/main/TYPES.md )
解决方案
这个出错的原因从官方的角度看是非常合理的,但我们数据是从 JSON 反序列化过来的,即使我们明知入库存储不存在范围的问题,也会因为数据类型的原因无法入库。
所以我们如果想实现使用 map[string]any
插入数据到 clickhouse ,就需要根据数据表中实际的字段类型对 map[string]any
进行一次转换。
获取字段类型
实现类型转换的第一步,我们需要首先知道目标表有哪些字段需要写入,以及这些字段的类型。
可以通过读取 clickhouse 的系统表获取字段信息:
SELECT `name`,`type` FROM `system`.columns WHERE `database` = 'gorm' AND `table` = 'test'
-- 结果
name|type |
----+------+
name|String|
age |UInt8 |
然后我们使用 Gorm 来读取:
// 先定义一个结构体用来获取数据
type FieldType struct {
FieldName string `gorm:"column:name"`
FieldType string `gorm:"column:type"`
}
func main() {
...
var DbTypes []FieldType
db.Raw("SELECT `name`,`type` FROM `system`.columns WHERE `database` = ? AND `table` = ?",
"gorm", "test").Find(&DbTypes)
// [{FieldName:name FieldType:String} {FieldName:age FieldType:UInt8}]
fmt.Printf("%+v", DbTypes)
}
数据类型转换
接着我们写一个类型映射的方法,根据 ClickHouse 的字段类型,对 Data 数据类型进行转换。在遍历的同时,记录动态写入的字段,这样就可以根据数据库的字段调整自动写入数据。
// 字段类型映射
func DataTypeMap(DbTypes *[]FieldType, Data *map[string]any) []string {
// 用一个变量来接受 Select 动态插入字段
var selectFields []string
// 遍历 Clickhouse 字段
for _, field := range *DbTypes {
// 如果 map 中存在这个字段
if val, state := (*Data)[field.FieldName]; state {
// 就加入到 插入字段中
selectFields = append(selectFields, field.FieldName)
// 接着对数据进行类型转换
switch field.FieldType {
case "Int8":
(*Data)[field.FieldName] = cast.ToInt8(val)
case "Int16":
(*Data)[field.FieldName] = cast.ToInt16(val)
case "Int32":
(*Data)[field.FieldName] = cast.ToInt32(val)
case "Int64":
(*Data)[field.FieldName] = cast.ToInt64(val)
case "UInt8":
(*Data)[field.FieldName] = cast.ToUint8(val)
case "UInt16":
(*Data)[field.FieldName] = cast.ToUint16(val)
case "UInt32":
(*Data)[field.FieldName] = cast.ToUint32(val)
case "UInt64":
(*Data)[field.FieldName] = cast.ToUint64(val)
case "Decimal":
(*Data)[field.FieldName] = decimal.NewFromFloat(cast.ToFloat64(val))
case "String":
(*Data)[field.FieldName] = cast.ToString(val)
}
}
}
// 将需要插入的字段进行返回
return selectFields
}
完整代码示例
package main
import (
"encoding/json"
"fmt"
"github.com/shopspring/decimal"
"github.com/spf13/cast"
"gorm.io/driver/clickhouse"
"gorm.io/gorm"
)
type FieldType struct {
FieldName string `gorm:"column:name"`
FieldType string `gorm:"column:type"`
}
func main() {
db, _ := gorm.Open(clickhouse.Open("clickhouse://default:@192.168.0.10:19000/gorm"), &gorm.Config{})
var (
str = "{\"name\":\"dbkuaizi\",\"age\":18,\"phone\":15300000000,\"sex\":1,\"ip\":\"127.0.0.1\",\"time\":1687007258}"
Data map[string]any
DbTypes []FieldType
)
// 获取字段名称和类型
db.Raw("SELECT `name`,`type` FROM `system`.columns WHERE `database` = ? AND `table` = ?",
"gorm", "test").Find(&DbTypes)
// 拿到反序列化后的数据
json.Unmarshal([]byte(str), &Data)
// 进行类型转换,并拿到需要动态写入的字段
selectFields := DataTypeMap(&DbTypes, &Data)
if err := db.Table("test").Select(selectFields).Create(Data).Error; err != nil {
fmt.Println(err)
}
}
// 字段类型映射
func DataTypeMap(DbTypes *[]FieldType, Data *map[string]any) []string {
// 用一个变量来接受 Select 动态插入字段
var selectFields []string
// 遍历 Clickhouse 字段
for _, field := range *DbTypes {
// 如果 map 中存在这个字段
if val, state := (*Data)[field.FieldName]; state {
// 就加入到 插入字段中
selectFields = append(selectFields, field.FieldName)
// 接着对数据进行类型转换(这里只是转换了一部分类型,做个演示)
switch field.FieldType {
case "Int8":
(*Data)[field.FieldName] = cast.ToInt8(val)
case "Int16":
(*Data)[field.FieldName] = cast.ToInt16(val)
case "Int32":
(*Data)[field.FieldName] = cast.ToInt32(val)
case "Int64":
(*Data)[field.FieldName] = cast.ToInt64(val)
case "UInt8":
(*Data)[field.FieldName] = cast.ToUint8(val)
case "UInt16":
(*Data)[field.FieldName] = cast.ToUint16(val)
case "UInt32":
(*Data)[field.FieldName] = cast.ToUint32(val)
case "UInt64":
(*Data)[field.FieldName] = cast.ToUint64(val)
case "Decimal":
(*Data)[field.FieldName] = decimal.NewFromFloat(cast.ToFloat64(val))
case "String":
(*Data)[field.FieldName] = cast.ToString(val)
}
}
}
// 将需要插入的字段进行返回
return selectFields
}
动态写入测试
随后修改数据库字段,增加一个 phone 的字段,在不需要调整代码的前提下,数据也可以正常入库。这样就实现了字段动态写入的需求。
name |age|phone |
--------+---+-----------+
dbkuaizi| 18| |
dbkuaizi| 18|15300000000|
需要注意的是,demo 中没有并发场景,所以直接使用了
map
类型。如果实际使用中有并发场景,考虑到并发安全,应使用 sync.Map
类型。
不明觉历哈。@(哈哈)