From cc7ad204e80003206d66f03b87e705864fa62923 Mon Sep 17 00:00:00 2001 From: thomaswoehlke <thomas.woehlke@gmail.com> Date: Sat, 6 Feb 2021 23:09:14 +0100 Subject: [PATCH] dev for Milestone 0.0.14 --- .../blueprints/rki/rki_service_update.py | 4 +- src/covid19/blueprints/who/who_service.py | 63 ++++++---- .../blueprints/who/who_service_update.py | 113 +++++++----------- src/covid19/blueprints/who/who_views.py | 92 +++++++++++--- 4 files changed, 166 insertions(+), 106 deletions(-) diff --git a/src/covid19/blueprints/rki/rki_service_update.py b/src/covid19/blueprints/rki/rki_service_update.py index 227521f8..26c5751b 100644 --- a/src/covid19/blueprints/rki/rki_service_update.py +++ b/src/covid19/blueprints/rki/rki_service_update.py @@ -1,7 +1,7 @@ from database import db, app from covid19.blueprints.rki.rki_model import RkiRegion, RkiDateReported, RkiCountry, RkiGermanyData -from covid19.blueprints.rki.rki_model_import import RkiGermanyDataImportTable +from covid19.blueprints.rki.rki_model_import import RkiBundeslaenderImport rki_service_update = None @@ -21,7 +21,7 @@ class RkiServiceUpdate: app.logger.info(" update who_date_reported [begin]") app.logger.info("------------------------------------------------------------") i = 0 - for i_date_reported, in RkiGermanyDataImportTable.get_dates_reported(): + for i_date_reported, in RkiBundeslaenderImport.get_dates_reported(): c = RkiDateReported.find_by_date_reported(i_date_reported) if c is None: o = RkiDateReported.create_new_object_factory(my_date_rep=i_date_reported) diff --git a/src/covid19/blueprints/who/who_service.py b/src/covid19/blueprints/who/who_service.py index 1203b27d..f9c84695 100644 --- a/src/covid19/blueprints/who/who_service.py +++ b/src/covid19/blueprints/who/who_service.py @@ -16,45 +16,66 @@ class WhoService: app.logger.debug("------------------------------------------------------------") app.logger.info(" WHO Service [ready]") - def run_download(self): - app.logger.info(" run update [begin]") + def run_download_only(self): + app.logger.info(" run_download_only [begin]") app.logger.info("------------------------------------------------------------") success = self.who_service_download.download_file() app.logger.info("") - app.logger.info(" run update [done]") + app.logger.info(" run_download_only [done]") app.logger.info("------------------------------------------------------------") return success - def run_update(self, import_file=True): - app.logger.info(" run update [begin]") + def run_import_only(self): + app.logger.info(" run_import_only [begin]") app.logger.info("------------------------------------------------------------") - if import_file: - self.who_service_import.import_file() - self.who_service_update.update_db() + self.who_service_import.import_file() app.logger.info("") - app.logger.info(" run update [done]") + app.logger.info(" run_import_only [done]") app.logger.info("------------------------------------------------------------") return self - def run_update_short(self, import_file=True): - app.logger.info(" run update short [begin]") + def run_update_dimension_tables_only(self): + app.logger.info(" run_update_only [begin]") app.logger.info("------------------------------------------------------------") - if import_file: - self.who_service_import.import_file() - self.who_service_update.update_db_short() + self.who_service_update.update_dimension_tables_only() app.logger.info("") - app.logger.info(" run update short [done]") + app.logger.info(" run_update_only [done]") app.logger.info("------------------------------------------------------------") return self - def run_update_initial(self, import_file=True): - app.logger.info(" run update initial [begin]") + def run_update_fact_table_incremental_only(self): + app.logger.info(" run_update_only [begin]") app.logger.info("------------------------------------------------------------") - if import_file: - self.who_service_import.import_file() - self.who_service_update.update_db_initial() + self.who_service_update.update_fact_table_incremental_only() app.logger.info("") - app.logger.info(" run update initial [done]") + app.logger.info(" run_update_only [done]") + app.logger.info("------------------------------------------------------------") + return self + + def run_update_fact_table_initial_only(self): + app.logger.info(" run_update_initial [begin]") + app.logger.info("------------------------------------------------------------") + self.who_service_update.update_fact_table_initial_only() + app.logger.info("") + app.logger.info(" run_update_initial [done]") + app.logger.info("------------------------------------------------------------") + return self + + def run_update_star_schema_incremental(self): + app.logger.info(" run_update_short [begin]") + app.logger.info("------------------------------------------------------------") + self.who_service_update.update_star_schema_incremental() + app.logger.info("") + app.logger.info(" run_update_short [done]") + app.logger.info("------------------------------------------------------------") + return self + + def run_update_star_schema_initial(self): + app.logger.info(" run_update_initial_full [begin]") + app.logger.info("------------------------------------------------------------") + self.who_service_update.update_star_schema_initial() + app.logger.info("") + app.logger.info(" run_update_initial_full [done]") app.logger.info("------------------------------------------------------------") return self diff --git a/src/covid19/blueprints/who/who_service_update.py b/src/covid19/blueprints/who/who_service_update.py index 36ec9e69..d9a75cee 100644 --- a/src/covid19/blueprints/who/who_service_update.py +++ b/src/covid19/blueprints/who/who_service_update.py @@ -15,7 +15,7 @@ class WhoServiceUpdate: app.logger.debug(" WHO Service Update [ready]") def __update_who_date_reported(self): - app.logger.info(" update who_date_reported [begin]") + app.logger.info(" __update_who_date_reported [begin]") app.logger.info("------------------------------------------------------------") for i_date_reported, in WhoGlobalDataImportTable.get_dates_reported(): c = WhoDateReported.find_by_date_reported(i_date_reported) @@ -27,12 +27,12 @@ class WhoServiceUpdate: else: app.logger.info(" update who_date_reported "+i_date_reported+" NOT added "+str(c.id)) app.logger.info("") - app.logger.info(" update who_date_reported [done]") + app.logger.info(" __update_who_date_reported [done]") app.logger.info("------------------------------------------------------------") return self def __update_who_region(self): - app.logger.info(" update who_region [begin]") + app.logger.info(" __update_who_region [begin]") app.logger.info("------------------------------------------------------------") for i_who_region, in WhoGlobalDataImportTable.get_regions(): c = WhoRegion.find_by_region(i_who_region) @@ -44,12 +44,12 @@ class WhoServiceUpdate: else: app.logger.info(i_who_region + " NOT added "+str(c.id)) app.logger.info("") - app.logger.info(" update who_region [done]") + app.logger.info(" __update_who_region [done]") app.logger.info("------------------------------------------------------------") return self def __update_who_country(self): - app.logger.info(" update who_country [begin]") + app.logger.info(" __update_who_country [begin]") app.logger.info("------------------------------------------------------------") result = WhoGlobalDataImportTable.countries() for result_item in result: @@ -78,48 +78,12 @@ class WhoServiceUpdate: app.logger.info(output) db.session.commit() app.logger.info("") - app.logger.info(" update who_country [done]") + app.logger.info(" __update_who_country [done]") app.logger.info("------------------------------------------------------------") return self - def __update_who_global_data(self): - app.logger.info(" update WHO [begin]") - app.logger.info("------------------------------------------------------------") - #dates_reported = WhoDateReported.get_all_as_dict() - #countries = WhoCountry.get_all_as_dict() - i = 0 - result = WhoGlobalDataImportTable.get_all() - for result_item in result: - if result_item.country_code != "": - my_country = WhoCountry.find_by_country_code(result_item.country_code) #countries[result_item.country_code] - else: - my_country = WhoCountry.find_by_country(result_item.country) - my_date_reported = WhoDateReported.find_by_date_reported(result_item.date_reported) #dates_reported[result_item.date_reported] - result_who_global_data = WhoGlobalData.find_one_or_none_by_date_and_country( - my_date_reported, - my_country) - if result_who_global_data is None: - o = WhoGlobalData( - cases_new=int(result_item.new_cases), - cases_cumulative=int(result_item.cumulative_cases), - deaths_new=int(result_item.new_deaths), - deaths_cumulative=int(result_item.cumulative_deaths), - date_reported=my_date_reported, - country=my_country - ) - db.session.add(o) - i += 1 - if i % 2000 == 0: - app.logger.info(" update WHO ... "+str(i)+" rows") - db.session.commit() - db.session.commit() - app.logger.info(" update WHO : "+str(i)+" rows total") - app.logger.info(" update WHO [done]") - app.logger.info("------------------------------------------------------------") - return self - - def __update_who_global_data_short(self): - app.logger.info(" update WHO short [begin]") + def __update_fact_table_incremental(self): + app.logger.info(" __update_fact_tables_incremental [begin]") app.logger.info("------------------------------------------------------------") new_dates_reported_from_import = WhoGlobalDataImportTable.get_new_dates_as_array() i = 0 @@ -150,12 +114,12 @@ class WhoServiceUpdate: db.session.commit() app.logger.info(" update WHO short ... " + str(i) + " rows ["+ str(my_date) +"]") app.logger.info(" update WHO short : "+str(i)+" rows total") - app.logger.info(" update WHO short [done]") + app.logger.info(" __update_fact_tables_incremental [done]") app.logger.info("------------------------------------------------------------") return self - def __update_who_global_data_initial(self): - app.logger.info(" update WHO initial [begin]") + def __update_fact_table_initial(self): + app.logger.info(" __update_fact_table_initial [begin]") app.logger.info("------------------------------------------------------------") WhoGlobalData.remove_all() new_dates_reported_from_import = WhoGlobalDataImportTable.get_new_dates_as_array() @@ -185,39 +149,54 @@ class WhoServiceUpdate: db.session.commit() db.session.commit() app.logger.info(" update WHO initial : "+str(i)+" total rows") - app.logger.info(" update WHO initial [done]") + app.logger.info(" __update_fact_table_initial [done]") app.logger.info("------------------------------------------------------------") return self - def update_db(self): - app.logger.info(" update db [begin]") - app.logger.info("------------------------------------------------------------") + def __update_dimension_tables(self): self.__update_who_date_reported() self.__update_who_region() self.__update_who_country() - self.__update_who_global_data() - app.logger.info(" update db [done]") + return self + + def update_dimension_tables_only(self): + app.logger.info(" update_dimension_tables_only [begin]") + app.logger.info("------------------------------------------------------------") + self.__update_dimension_tables() + app.logger.info(" update_dimension_tables_only [done]") app.logger.info("------------------------------------------------------------") return self - def update_db_short(self): - app.logger.info(" update db short [begin]") + def update_fact_table_incremental_only(self): + app.logger.info(" update_fact_tables_incremental_only [begin]") app.logger.info("------------------------------------------------------------") - self.__update_who_date_reported() - self.__update_who_region() - self.__update_who_country() - self.__update_who_global_data_short() - app.logger.info(" update db short [done]") + self.__update_fact_table_incremental() + app.logger.info(" update_fact_tables_incremental_only [done]") app.logger.info("------------------------------------------------------------") return self - def update_db_initial(self): - app.logger.info(" update db initial [begin]") + def update_fact_table_initial_only(self): + app.logger.info(" update_fact_tables_initial_only [begin]") app.logger.info("------------------------------------------------------------") - self.__update_who_date_reported() - self.__update_who_region() - self.__update_who_country() - self.__update_who_global_data_initial() - app.logger.info(" update db initial [done]") + self.__update_fact_table_initial() + app.logger.info(" update_fact_tables_initial_only [done]") + app.logger.info("------------------------------------------------------------") + return self + + def update_star_schema_incremental(self): + app.logger.info(" update_star_schema_incremental [begin]") + app.logger.info("------------------------------------------------------------") + self.__update_dimension_tables() + self.__update_fact_table_incremental() + app.logger.info(" update_star_schema_incremental [done]") + app.logger.info("------------------------------------------------------------") + return self + + def update_star_schema_initial(self): + app.logger.info(" update_star_schema_initial [begin]") + app.logger.info("------------------------------------------------------------") + self.__update_dimension_tables() + self.__update_fact_table_initial() + app.logger.info(" update_star_schema_initial [done]") app.logger.info("------------------------------------------------------------") return self diff --git a/src/covid19/blueprints/who/who_views.py b/src/covid19/blueprints/who/who_views.py index f02bedc8..c0a0b7cd 100644 --- a/src/covid19/blueprints/who/who_views.py +++ b/src/covid19/blueprints/who/who_views.py @@ -23,11 +23,11 @@ app_who = Blueprint('who', __name__, template_folder='templates') @celery.task(bind=True) -def task_who_run_update(self, import_file=True): +def task_who_run_update_full(self, import_file=True): self.update_state(state=states.STARTED) logger = get_task_logger(__name__) logger.info("------------------------------------------------------------") - logger.info(" Received: task_who_run_update [OK] ") + logger.info(" Received: task_who_run_update_full [OK] ") logger.info("------------------------------------------------------------") who_service.run_update(import_file) self.update_state(state=states.SUCCESS) @@ -61,6 +61,32 @@ def task_who_update_initial(self): return result +@celery.task(bind=True) +def task_who_import_only(self): + logger = get_task_logger(__name__) + self.update_state(state=states.STARTED) + logger.info("------------------------------------------------------------") + logger.info(" Received: task_who_import_only [OK] ") + logger.info("------------------------------------------------------------") + who_service.who_service_import.import_file() + self.update_state(state=states.SUCCESS) + result = "OK (task_who_import_only)" + return result + + +@celery.task(bind=True) +def task_who_update_only(self): + logger = get_task_logger(__name__) + self.update_state(state=states.STARTED) + logger.info("------------------------------------------------------------") + logger.info(" Received: task_who_update_only [OK] ") + logger.info("------------------------------------------------------------") + who_service.who_service_update.update_only_db() + self.update_state(state=states.SUCCESS) + result = "OK (task_who_update_only)" + return result + + @app_who.route('/info') def url_who_info(): page_info = ApplicationPage('WHO', "Info") @@ -354,30 +380,64 @@ def url_who_germany(page=1): page_info=page_info) -@app_who.route('/update') -def url_who_update_run(): - app.logger.info("url_who_update_run [start]") +@app_who.route('/task/download/only') +def url_who_task_download_only(): + app.logger.info("url_who_task_download_only [start]") who_service.who_service_download.download_file() - task_who_run_update.apply_async() + flash("who_service.who_service_download.download_file ok") + flash(message="long running background task started", category="warning") + app.logger.info("url_who_task_download_only [done]") + return redirect(url_for('url_who_tasks')) + + +@app_who.route('/task/import/only') +def url_who_task_import_only(): + app.logger.info("url_who_update_run [start]") + task_who_import_only.apply_async() flash("who_service.run_update started") flash(message="long running background task started", category="warning") app.logger.info("url_who_update_run [done]") - return redirect(url_for('url_home')) + return redirect(url_for('url_who_tasks')) -@app_who.route('/update/short') -def url_who_update_short_run(): +@app_who.route('/task/update/only') +def url_who_task_update_only(): + app.logger.info("url_who_update_run [start]") + task_who_update_only.apply_async() + flash("task_who_update_only started") + flash(message="long running background task started", category="warning") + app.logger.info("url_who_update_run [done]") + return redirect(url_for('url_who_tasks')) + + +@app_who.route('/task/update/initial') +def url_who_task_update_initial(): + who_service.who_service_download.download_file() + flash("who_service.who_service_download.download_file ok") + task_who_update_initial.apply_async() + flash("task_who_update_initial started") + flash(message="long running background task started", category="warning") + return redirect(url_for('url_who_tasks')) + + +@app_who.route('/task/update/short') +def url_who_task_update_short(): who_service.who_service_download.download_file() + flash("who_service.who_service_download.download_file ok") task_who_update_short.apply_async() - flash("who_service.run_update_short started") + flash("task_who_update_short started") flash(message="long running background task started", category="warning") - return redirect(url_for('url_home')) + return redirect(url_for('url_who_tasks')) -@app_who.route('/update/initial') -def url_who_update_initial_run(): +@app_who.route('/task/update/full') +def url_who_task_update_full(): + app.logger.info("url_who_task_update_full [start]") who_service.who_service_download.download_file() - task_who_update_initial.apply_async() - flash("who_service.run_update_short started") + flash("who_service.who_service_download.download_file ok") + task_who_run_update_full.apply_async() + flash("task_who_run_update_full started") flash(message="long running background task started", category="warning") - return redirect(url_for('url_home')) + app.logger.info("url_who_task_update_full [done]") + return redirect(url_for('url_who_tasks')) + -- GitLab