卡桑德拉中全表扫描中的问题

2022-09-04 03:21:11

首先:我知道在Cassandra进行全面扫描不是一个好主意,但是,目前,这就是我需要的。

当我开始寻找做这样的事情时,我读到人们说不可能在Cassandra中进行全面扫描,他不是被要求做这种事情的。

不满意,我一直在寻找,直到我发现这篇文章:http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

看起来很合理,我试了一下。由于我将只执行一次此完全扫描,并且时间和性能不是问题,因此我编写了查询并将其放入一个简单的作业中以查找所需的所有记录。从20亿行记录中,大约1000行是我的预期输出,但是,我只有100条记录。

我的工作:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

基本上,我所做的是搜索允许的最小令牌,并逐步进行,直到最后一个。

我不知道,但就像工作没有完全完成完全扫描,或者我的查询只访问了一个节点或其他东西。我不知道我是否做错了什么,或者真的不可能进行全面扫描。

今天,我有近 2 TB 的数据,在一个由七个节点组成的集群中只有一个表。

有人已经遇到过这种情况或有一些建议?


答案 1

在Cassandra中进行全表扫描绝对是可能的 - 事实上,对于像Spark这样的东西来说,这是很常见的。但是,它通常不是“快速”的,所以除非你知道你为什么要这样做,否则它是不鼓励的。对于您的实际问题:

1)如果您使用的是CQL,那么您几乎可以肯定使用的是Murmur3分区程序,因此您的最小令牌是-9223372036854775808(最大令牌是9223372036854775808)。

2)你正在使用ssion.execute(),它将使用默认的ONE一致性,这可能不会返回集群中的所有结果,特别是如果你也在ONE上写作,我怀疑你可能是这样。将其提高到 ALL,并使用预准备语句来加快 CQL 分析:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }

答案 2

我强烈建议使用Spark - 即使在独立的应用程序中(即没有集群)。它将负责将分区分组并逐个处理它们。死也易于使用:

https://github.com/datastax/spark-cassandra-connector


推荐