Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

suspended=1 does not stop data processing #2572

Open
PavelShilin89 opened this issue Sep 17, 2024 · 6 comments
Open

suspended=1 does not stop data processing #2572

PavelShilin89 opened this issue Sep 17, 2024 · 6 comments
Assignees
Labels

Comments

@PavelShilin89
Copy link
Contributor

PavelShilin89 commented Sep 17, 2024

In the kafka and manticore integration test, suspended=1 does not stop data processing. It should be noted that detection of this problem is possible under certain conditions.
I discovered this problem this way:

MRE


docker network create app-network --driver bridge > /dev/null; echo $?
docker run -it -e EXTRA=1 --network=app-network --platform linux/amd64 --name manticore -d ghcr.io/manticoresoftware/manticoresearch:test-kit-latest bash > /dev/null 2>&1; echo $?
docker exec manticore sed -i '/data_dir = \/var\/lib\/manticore/a\    buddy_path = manticore-executor -n /usr/share/manticore/modules/manticore-buddy/src/main.php --debugv\n' /etc/manticoresearch/manticore.conf
docker exec manticore searchd
docker run -it -d -e EXTRA=1 --network=app-network --name kafka -v ./test/clt-tests/integrations/kafka/import.sh:/import.sh -v ./test/clt-tests/integrations/kafka/dump.json:/tmp/dump.json -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.7.0 > /dev/null 2>&1; echo $?
docker exec kafka kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092 2>&1 | grep -o 'Created topic my-data\.' | head -n 1
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka"; echo $?
docker exec kafka chmod +x ./import.sh; docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka;"
docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka ORDER BY id ASC;"
docker exec manticore mysql -h0 -P9306 -e "DROP SOURCE kafka;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "DROP table destination_kafka;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "SHOW TABLES;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_destination --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json, location json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_destination' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi, lat float, lon float, distance float);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_destination TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, location.lat as lat, location.lon as lon, GEODIST(lat, lon, 49.0, 3.0) AS distance FROM kafka"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT id, name, short_name, received_at, size, lat, lon AS distance FROM destination_kafka ORDER BY id ASC;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_metadata --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_metadata (id bigint, term text, abbrev text, GlossDef json, metadata json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_metadata' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_metadata (id bigint, name text, short_name text, received_at text, size multi, views int, info text);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_metadata TO destination_kafka_metadata AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, metadata.views as views, metadata.info as info FROM kafka_metadata WHERE views > 1000;"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka_metadata" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_metadata;"
docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka_metadata ORDER BY id ASC;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_tags --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_tags (id bigint, term text, abbrev text, GlossDef json, tags json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_tags' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_tags (id bigint, name text, short_name text, received_at text, size multi, tags json);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_tags TO destination_kafka_tags AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, tags FROM kafka_tags WHERE tags IN ('item1', 'item2');"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka_tags" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_tags;"
docker exec manticore mysql -h0 -P9306 -e "SELECT * FROM destination_kafka_tags ORDER BY id ASC;"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_alter --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_alter (id bigint, term text, abbrev text, GlossDef json, metadata json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_alter' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_alter (id bigint, name text, short_name text, received_at text, size multi, views bigint);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_alter TO destination_kafka_alter AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, metadata.views as views FROM kafka_alter;"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"


Expected result:

––– input –––
docker exec kafka ./import.sh; echo $?
––– output –––
0
––– input –––
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?
––– output –––
0
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       50 |
+----------+
––– input –––
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       50 |
+----------+
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
––– output –––
0
––– input –––
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       57 |
+----------+

Current result:

––– input –––
docker exec kafka ./import.sh; echo $?
––– output –––
0
––– input –––
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?
––– output –––
0
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       50 |
+----------+
––– input –––
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       57 |
+----------+
––– input –––
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
––– output –––
0
––– input –––
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
––– output –––
Data processing completed.
––– input –––
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
––– output –––
+----------+
| count(*) |
+----------+
|       57 |
+----------+

Test data:
dump.json

The test start command:

RUN_ARGS='--privileged -v ".:/docker"' /Users/pavelshilin/Desktop/WORK/clt/clt test -t ./test/clt-tests/integrations/kafka/test-integration-kafka-ms.rec -d manticoresearch/dind:v1
@djklim87
Copy link
Contributor

Can you remove unnecessary code and leave only code that related to issue?

@djklim87 djklim87 assigned PavelShilin89 and unassigned djklim87 Sep 18, 2024
@PavelShilin89
Copy link
Contributor Author

@djklim87 this is exactly the code that has to do with the problem and how to reproduce it. If the code is different, or the location of the problem part is changed, the error will not be reproduced.
For clarity, the example itself is shown here Current result.

@djklim87
Copy link
Contributor

Test your MRE before put it into the issue.

  1. You creating table destination_kafka two times. So your results are different from what you write in issue.
  2. You checking in logs for grep -m1 "REPLACE+INTO+destination_kafka" however your table calls destination_kafka_alter so you should get results as provided below
Data processing completed.
0
+----------+
| count(*) |
+----------+
|        0 |
+----------+
+----------+
| count(*) |
+----------+
|        0 |
+----------+
0
Data processing completed.
+----------+
| count(*) |
+----------+
|       57 |
+----------+

@djklim87 djklim87 removed their assignment Sep 18, 2024
@PavelShilin89
Copy link
Contributor Author

@djklim87 MRE updated

@PavelShilin89 PavelShilin89 removed their assignment Sep 23, 2024
@djklim87
Copy link
Contributor

djklim87 commented Sep 25, 2024

Reproducible MRE

Run dind

docker run -it --privileged --name dind manticoresearch/dind:v1

All commands

docker exec -it dind bash

cd /tmp
wget https://raw.githubusercontent.com/manticoresoftware/manticoresearch/refs/heads/test/native-support-for-message-queues/test/clt-tests/integrations/kafka/import.sh
wget https://raw.githubusercontent.com/manticoresoftware/manticoresearch/refs/heads/test/native-support-for-message-queues/test/clt-tests/integrations/kafka/dump.json
chmod +x ./import.sh

docker network create app-network --driver bridge > /dev/null; echo $?
docker run -it -e EXTRA=1 --network=app-network --platform linux/amd64 --name manticore -d ghcr.io/manticoresoftware/manticoresearch:test-kit-latest bash ; echo $?
docker exec manticore sed -i '/data_dir = \/var\/lib\/manticore/a\    buddy_path = manticore-executor -n /usr/share/manticore/modules/manticore-buddy/src/main.php --debugv\n' /etc/manticoresearch/manticore.conf
docker exec manticore searchd
docker run -it -d -e EXTRA=1 --network=app-network --name kafka -v /tmp/import.sh:/import.sh -v /tmp/dump.json:/tmp/dump.json -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT bitnami/kafka:3.7.0 ; echo $?
docker exec kafka kafka-topics.sh --create --topic my-data --partitions 4 --bootstrap-server localhost:9092 2>&1 | grep -o 'Created topic my-data\.' | head -n 1



docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka (id bigint, term text, abbrev text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m2 "REPLACE+INTO+destination_kafka" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "DROP SOURCE kafka;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "DROP table destination_kafka;"; echo $?



echo "----------------------------------------------- RUN ------------------------------------------"
docker exec kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group manticore_alter --reset-offsets --to-latest --topic my-data --execute > /dev/null; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE SOURCE kafka_alter (id bigint, term text, abbrev text, GlossDef json, metadata json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore_alter' num_consumers='1' batch=50;"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE TABLE destination_kafka_alter (id bigint, name text, short_name text, received_at text, size multi, views bigint);"; echo $?
docker exec manticore mysql -h0 -P9306 -e "CREATE MATERIALIZED VIEW view_table_alter TO destination_kafka_alter AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size, metadata.views as views FROM kafka_alter;"; echo $?
docker exec kafka ./import.sh; echo $?
timeout 60 bash -c 'docker exec manticore bash -c "tail -f /var/log/manticore/searchd.log" | grep -m1 "REPLACE+INTO+destination_kafka_alter" > /dev/null && echo "Data processing completed."' || echo "Data processing failed."
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=1;"; echo $?

echo "----------------------------------------------- Should be 50 ------------------------------------------"
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
echo "----------------------------------------------- Should be 50 ------------------------------------------"
sleep 10; docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"
docker exec manticore mysql -h0 -P9306 -e "ALTER MATERIALIZED VIEW view_table_alter suspended=0;"; echo $?
echo "----------------------------------------------- Should be 57 ------------------------------------------"
timeout 120 bash -c 'while [[ $(docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;" | grep -o "[0-9]*") -ne 57 ]]; do sleep 1; done && echo "Data processing completed."'
docker exec manticore mysql -h0 -P9306 -e "SELECT COUNT(*) FROM destination_kafka_alter;"


@djklim87
Copy link
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants