我有许多ipfix(netflow)记录插入Kafka,我已经创建消费者通过go语言与此代码 包主

import (
    "context"
    "database/sql"
    "encoding/json"
    "flag"
    "fmt"
    "log"
//  "os"
//  "strconv"
    "sync"
    "time"
       "github.com/ClickHouse/clickhouse-go"
    "github.com/segmentio/kafka-go"
    cluster "github.com/bsm/sarama-cluster"
)

type options struct {
    Broker  string
    Topic   string
    Debug   bool
    Workers int
}

type dataField struct {
    I int
    V interface{}
}
type Header struct {
    Version     int 
    Length      int 
    ExportTime  int64 
    SequenceNo  int 
    DomainID    int 
}

type ipfix struct {
    AgentID  string
        Header  Header 
    DataSets [][]dataField
}

type dIPFIXSample struct {
    device string
    sourceIPv4Address    string
    sourceTransportPort   uint64 
    postNATSourceIPv4Address    string
    postNATSourceTransportPort uint64
    destinationIPv4Address string
    postNATDestinationIPv4Address string
    postNATDestinationTransportPort uint64
    dstport   uint64 
       timestamp  string 
       postNATSourceIPv6Address string
       postNATDestinationIPv6Address string
      sourceIPv6Address string
      destinationIPv6Address string
      proto  uint8
     login string
     sessionid  uint64 
}

var opts options

func init() {
    flag.StringVar(&opts.Broker, "broker", "172.18.0.4:9092", "broker ipaddress:port")
    flag.StringVar(&opts.Topic, "topic", "vflow.ipfix", "kafka topic")
    flag.BoolVar(&opts.Debug, "debug", true, "enabled/disabled debug")
    flag.IntVar(&opts.Workers, "workers", 16, "workers number / partition number")

    flag.Parse()
}


func main() {
    var (
        wg sync.WaitGroup
        ch = make(chan ipfix, 10000)
    )

    for i := 0; i < 5; i++ {
        go ingestClickHouse(ch)
    }

    wg.Add(opts.Workers)

    for i := 0; i < opts.Workers; i++ {
        go func(ti int) {
            // create a new kafka reader with the broker and topic
            r := kafka.NewReader(kafka.ReaderConfig{
                Brokers: []string{opts.Broker},
                Topic:   opts.Topic,
                GroupID: "mygroup",
                // start consuming from the earliest message
                StartOffset: 0,
            })

            pCount := 0
            count := 0
            tik := time.Tick(10 * time.Second)

            for {
                select {
                case <-tik:
                    if opts.Debug {
                        log.Printf("partition GroupId#%d,  rate=%d\n", ti, (count-pCount)/10)
                    }
                    pCount = count
                default:
                    // read the next message from kafka
                    m, err := r.ReadMessage(context.Background())
                    if err != nil {
                        if err == kafka.ErrGenerationEnded {
                            log.Println("generation ended")
                            return
                        }
                        log.Println(err)
                        continue
                    }
//                  log.Printf("Received message from Kafka: %s\n", string(m.Value))

                                        
                    // unmarshal the message into an ipfix struct
                     objmap:=  ipfix{}
                    if err := json.Unmarshal(m.Value, &objmap); err != nil {
                        log.Println(err)
                        continue
                    }
                                           fmt.Sprintf("kkkkkkkkkkkkkkkk%v",objmap);
                    // send the ipfix struct to the ingestClickHouse goroutine
                    ch <- objmap
//                                         go ingestClickHouse(ch)

                    // mark the message as processed
                    if err := r.CommitMessages(context.Background(), m); err != nil {
                        log.Println(err)
                        continue
                    }

                    count++
                }
            }
        }(i)
    }

    wg.Wait()
//  close(ch)
}


func ingestClickHouse(ch chan ipfix) {
    var sample ipfix

    connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&username=default&password=wawa123")
    if err != nil {
        log.Fatal(err)
    }
    if err := connect.Ping(); err != nil {
        if exception, ok := err.(*clickhouse.Exception); ok {
            log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
        } else {
            log.Println(err)
        }
        return
    }
    defer connect.Close()
    for {
        tx, err := connect.Begin()
        if err != nil {
            log.Fatal(err)
        }
        stmt, err := tx.Prepare("INSERT INTO natdb.natlogs (timestamp,router_ip,sourceIPv4Address, sourceTransportPort,postNATSourceIPv4Address,postNATSourceTransportPort,destinationIPv4Address,dstport,postNATDestinationIPv4Address, postNATDestinationTransportPort,postNATSourceIPv6Address,postNATDestinationIPv6Address,sourceIPv6Address,destinationIPv6Address,proto,login) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)")
        if err != nil {
            log.Fatal(err)
        }


        for i := 0; i < 10000; i++ {

            sample = <-ch
            for _, data := range sample.DataSets {
                s := dIPFIXSample{}
                for _, dd := range data {
                    switch dd.I {
                    case 8:
                        s.sourceIPv4Address = dd.V.(string)
                    case 7:
                        s.sourceTransportPort =uint64( dd.V.(float64))
                    case 225:
                        s.postNATSourceIPv4Address =  dd.V.(string)
                    case 227:
                        s.postNATSourceTransportPort = uint64(dd.V.(float64))
                    case 12:
                          s.destinationIPv4Address=dd.V.(string)
                    case 11:
                          s.dstport=uint64(dd.V.(float64))
                    case 226:
                          s.postNATDestinationIPv4Address=dd.V.(string)
                    case 27:
                          s.sourceIPv6Address=dd.V.(string)
                    case 28:
                          s.destinationIPv6Address=dd.V.(string)
                    case 281:
                          s.postNATSourceIPv6Address=dd.V.(string)
                    case 282:
                          s.postNATDestinationIPv6Address=dd.V.(string) 
                    case 2003:
                          s.login =dd.V.(string)
                          log.Printf(dd.V.(string))   
                    case 228:
                          s.postNATDestinationTransportPort=uint64(dd.V.(float64))                      
                    case 4:
                        s.proto = uint8(dd.V.(float64))
                    }
                }
                timestamp := time.Unix(sample.Header.ExportTime, 0).Format("2006-01-02 15:04:05")


 

                if _, err := stmt.Exec(
 
                    timestamp,
                     
                    sample.AgentID,
                    s.sourceIPv4Address,
                    s.sourceTransportPort,
                    s.postNATSourceIPv4Address,
                    s.postNATSourceTransportPort,
                    s.destinationIPv4Address,
                    s.dstport,
                    s.postNATDestinationIPv4Address,
                    s.postNATDestinationTransportPort,
                                        s.postNATSourceIPv6Address,
                                        s.postNATDestinationIPv6Address,
                                        s.sourceIPv6Address,
                                        s.destinationIPv6Address,
                    s.proto,
                                        s.login,
                ); err != nil {
                    log.Fatal(err)
                }

}



     }
        go func(tx *sql.Tx) {
            if err := tx.Commit(); err != nil {
                log.Fatal(err)
            }
        }(tx)


    }
}

代码工作正常,我可以插入数据在clickhouse,但由于高流量和大量的数据插入在卡夫卡有一个延迟之间的卡夫卡和clickhouse增加作为流量增加,现在我有超过20小时的延迟,请你推荐我任何方法,使它更快这是我的clickhouse表

CREATE TABLE natdb.natlogs
(
    `timestamp` DateTime,
    `router_ip` String,
    `sourceIPv4Address` String,
    `sourceTransportPort` UInt64,
    `postNATSourceIPv4Address` String,
    `postNATSourceTransportPort` UInt64,
    `destinationIPv4Address` String,
    `dstport` UInt64,
    `postNATDestinationIPv4Address` String,
    `postNATDestinationTransportPort` UInt64,
    `proto` UInt8,
    `login` String,
    `sessionid` String,
    `sourceIPv6Address` String,
    `destinationIPv6Address` String,
    `postNATSourceIPv6Address` String,
    `postNATDestinationIPv6Address` String,
    INDEX idx_natlogs_router_source_time_postnat (router_ip, sourceIPv4Address, timestamp, postNATSourceIPv4Address) TYPE minmax GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY router_ip
SETTINGS index_granularity = 8192

我想有更快的方法来插入clickhouse数据 thanks in advance

我试过Go Consumer,插入数据很好,它可以在5分钟内插入超过200万条记录,但问题是每5分钟进入Kafka的数据超过2000万条,所以Kafka和Clickhouse之间都有很大的延迟

推荐答案

经过大量的研究,我创建了我的kafka主题分区,这使消费者工作更快,现在我能够共享实时数据在clickhouse我刚刚应用了这个命令在kafka和它的工作就像一个魅力

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic vflow.ipfix --partitions 16

Go相关问答推荐

如何使用Docker Compose配置Go,使main. go文件位于/CMD文件夹中

按键值排序字符串- Golang

错误.如果它包含切片,则返回FALSE

在 Go 中将元数据从一个 JPEG 复制到另一个

Kperf 构建失败

Golang chromedp Dockerfile

Golang Docker Selenium Chrome

Global Thread-local Storage 在 Go 中的可行性和最佳实践

为什么我只收到部分错误而不是我启动的 goroutines 的所有错误?

当我的 go build 成功时,如何修复我的 docker build 失败? Dockerfile 包括 go mod 下载

如何使用 GolangCI 删除未使用的导入

没有任务角色的 AWS CDK ECS 任务定义

如何使用 Go 代理状态为 OK 的预检请求?

如何在时间范围内规范化数组的元素?

在 Go GRPC 服务器流式拦截器上修改元数据

为什么 Go 被认为是部分抢占式的?

如何在 GORM 中迭代一个 int 数组

如何在 Windows 上使用 cgo 为 386 arch 构建 lib?

使用正则表达式拆分具有相同标题的数据块

Golang 有类似 C++ 的 decltype 的东西吗?