diff --git a/src/covid19/blueprints/ecdc/ecdc_model.py b/src/covid19/blueprints/ecdc/ecdc_model.py index c6a7372ea97902cc54ea413f7dc1f28d6c87fcf8..a07e10ac51c7c80f533f4f50243bf067b498cef3 100644 --- a/src/covid19/blueprints/ecdc/ecdc_model.py +++ b/src/covid19/blueprints/ecdc/ecdc_model.py @@ -162,39 +162,39 @@ class EcdcData(db.Model): return db.session.query(cls).filter(cls.id == other_id).one() @classmethod - def find_by_date_reported(cls, europe_date_reported, page): - #TODO: * Issue #43 /europe/date_reported + def find_by_date_reported(cls, ecdc_date_reported, page): + #TODO: * Issue #43 /ecdc/date_reported return db.session.query(cls).filter( - cls.europe_date_reported_id == europe_date_reported.id)\ + cls.ecdc_date_reported_id == ecdc_date_reported.id)\ .order_by(cls.notification_rate_per_100000_population_14days.desc())\ .paginate(page, per_page=ITEMS_PER_PAGE) @classmethod - def find_by_date_reported_notification_rate(cls, europe_date_reported, page): - # TODO: * Issue #43 /europe/date_reported + def find_by_date_reported_notification_rate(cls, ecdc_date_reported, page): + # TODO: * Issue #43 /ecdc/date_reported return db.session.query(cls).filter( - cls.europe_date_reported_id == europe_date_reported.id) \ + cls.ecdc_date_reported_id == ecdc_date_reported.id) \ .order_by(cls.notification_rate_per_100000_population_14days.desc()) \ .paginate(page, per_page=ITEMS_PER_PAGE) @classmethod - def find_by_date_reported_deaths_weekly(cls, europe_date_reported, page): - # TODO: * Issue #43 /europe/date_reported + def find_by_date_reported_deaths_weekly(cls, ecdc_date_reported, page): + # TODO: * Issue #43 /ecdc/date_reported return db.session.query(cls).filter( - cls.europe_date_reported_id == europe_date_reported.id) \ + cls.ecdc_date_reported_id == ecdc_date_reported.id) \ .order_by(cls.deaths_weekly.desc()) \ .paginate(page, per_page=ITEMS_PER_PAGE) @classmethod - def find_by_date_reported_cases_weekly(cls, europe_date_reported, page): - # TODO: * Issue #43 /europe/date_reported + def find_by_date_reported_cases_weekly(cls, ecdc_date_reported, page): + # TODO: * Issue #43 /ecdc/date_reported return db.session.query(cls).filter( - cls.europe_date_reported_id == europe_date_reported.id) \ + cls.ecdc_date_reported_id == ecdc_date_reported.id) \ .order_by(cls.cases_weekly.desc()) \ .paginate(page, per_page=ITEMS_PER_PAGE) @classmethod - def find_by_country(cls, europe_country, page): + def find_by_country(cls, ecdc_country, page): return db.session.query(cls).filter( - cls.europe_country_id == europe_country.id)\ + cls.ecdc_country_id == ecdc_country.id)\ .paginate(page, per_page=ITEMS_PER_PAGE) diff --git a/src/covid19/blueprints/ecdc/ecdc_model_import.py b/src/covid19/blueprints/ecdc/ecdc_model_import.py index 7097454849cc4bc88f55d63779811c9a25a27544..04c70daaf72b6c7151b012ff4cf4cd3979e53c81 100644 --- a/src/covid19/blueprints/ecdc/ecdc_model_import.py +++ b/src/covid19/blueprints/ecdc/ecdc_model_import.py @@ -24,8 +24,7 @@ class EcdcImport(db.Model): return None @classmethod - def get_all_as_page(cls, page): - #TODO: #51 order_by: year_week, country + def get_all_as_page(cls, page: int): return db.session.query(cls).order_by( cls.year_week, cls.countries_and_territories @@ -39,55 +38,71 @@ class EcdcImport(db.Model): ).all() @classmethod - def get_by_id(cls, other_id): + def get_by_id(cls, other_id: int): return db.session.query(cls).filter(cls.id == other_id).one() @classmethod def get_date_rep(cls): # TODO: #109 SQLalchemy instead of SQL in: EcdcImport.get_date_rep - sql = "select distinct date_rep, year_week from edcd_import order by year_week desc" + # sql = "select distinct date_rep, year_week from edcd_import order by year_week desc" #return db.session.execute(sql).fetchall() - return db.session.query(cls.date_rep, cls.year_week)\ - .order_by(cls.date_rep.desc(), cls.year_week.desc())\ + return db.session.query(cls.date_rep) \ + .group_by(cls.date_rep) \ + .order_by(cls.date_rep.desc())\ .distinct().all() @classmethod def get_continent(cls): # TODO: #110 SQLalchemy instead of SQL in: EcdcImport.get_continent - sql = "select distinct continent_exp from edcd_import order by continent_exp asc" + # sql = "select distinct continent_exp from edcd_import order by continent_exp asc" #return db.session.execute(sql).fetchall() return db.session.query(cls.continent_exp) \ - .order_by(cls.continent_exp.desc()) \ + .group_by(cls.continent_exp) \ + .order_by(cls.continent_exp.asc()) \ .distinct().all() @classmethod - def get_countries_of_continent(cls, my_continent): + def get_countries_of_continent(cls, my_continent: str): my_continent_exp = my_continent.region my_params = {} my_params['my_continent_param'] = my_continent_exp #TODO: #107 SQLalchemy instead of SQL in: EcdcImport.get_countries_of_continent #TODO: #108 BUG: change to ORM ClassHierarchy in: EcdcImport.get_countries_of_continent - sql = """ - select distinct - countries_and_territories, - geo_id, - country_territory_code, - pop_data_2019, - continent_exp - from - ecdc_import - group by - countries_and_territories, - geo_id, - country_territory_code, - pop_data_2019, - continent_exp - having - continent_exp = :my_continent_param - order by - countries_and_territories - """ - return db.session.execute(sql, my_params).fetchall() + return db.session.query( + cls.countries_and_territories, + cls.geo_id, + cls.country_territory_code, + cls.pop_data_2019, + cls.continent_exp + ).filter( + cls.continent_exp == my_continent + ).group_by( + cls.countries_and_sterritories, + cls.geo_id, + cls.country_territory_code, + cls.pop_data_2019, + cls.continent_exp + ).order_by(cls.countries_and_territories.asc()).distinct().all() + #sql = """ + #select distinct + # countries_and_territories, + # geo_id, + # pop_data_2019, + # continent_exp + #from + # ecdc_import + #group by + # countries_and_territories, + # geo_id, + # country_territory_code, + # pop_data_2019, + # continent_exp + #having + # continent_exp = :my_continent_param + #order by + # countries_and_territories + #""" + #return db.session.execute(sql, my_params).fetchall() @classmethod def find_by_date_reported(cls, europe_date_reported): diff --git a/src/covid19/blueprints/ecdc/ecdc_service.py b/src/covid19/blueprints/ecdc/ecdc_service.py index 09e1483c0424587eafa7bd8bda1aeeec3883acb0..2062606ad9107e508fcc2c0056827ef2948705c0 100644 --- a/src/covid19/blueprints/ecdc/ecdc_service.py +++ b/src/covid19/blueprints/ecdc/ecdc_service.py @@ -10,58 +10,63 @@ from covid19.blueprints.ecdc.ecdc_service_update import EcdcServiceUpdate class EcdcService: def __init__(self, database): app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service [init]") + app.logger.debug(" ECDC Service [init]") app.logger.debug("------------------------------------------------------------") self.__database = database - self.europe_service_download = EcdcServiceDownload(database) - self.europe_service_import = EcdcServiceImport(database) - self.europe_service_update = EcdcServiceUpdate(database) + self.ecdc_service_download = EcdcServiceDownload(database) + self.ecdc_service_import = EcdcServiceImport(database) + self.ecdc_service_update = EcdcServiceUpdate(database) app.logger.debug("------------------------------------------------------------") - app.logger.info(" Europe Service [ready] ") + app.logger.info(" ECDC Service [ready] ") def pretask_database_drop_create(self): flash("ecdc_service.download started") - self.europe_service_download.download() + self.ecdc_service_download.download() return self def task_database_drop_create(self): - self.europe_service_import.import_datafile_to_db() - self.europe_service_update.update_db_short() + self.ecdc_service_import.import_datafile_to_db() + self.ecdc_service_update.update_db_short() return self def run_download_only(self): - self.europe_service_download.download() + self.ecdc_service_download.download() return self def run_import_only(self): - self.europe_service_import.import_datafile_to_db() + self.ecdc_service_import.import_datafile_to_db() return self # TODO: #112 implement EcdcService.run_update_dimension_tables_only # TODO: #111 refactor to new method scheme itroduced 07.02.2021 def run_update_dimension_tables_only(self): + self.ecdc_service_update.update_dimension_tables_only() return self # TODO: #113 implement EcdcService.run_update_fact_table_incremental_only # TODO: #111 refactor to new method scheme itroduced 07.02.2021 def run_update_fact_table_incremental_only(self): + self.ecdc_service_update.update_fact_table_incremental_only() return self # TODO: #114 implement EcdcService.run_update_fact_table_initial_only # TODO: #111 refactor to new method scheme itroduced 07.02.2021 def run_update_fact_table_initial_only(self): + self.ecdc_service_update.update_fact_table_initial_only() return self # TODO: #115 implement EcdcService.run_update_star_schema_incremental # TODO: #111 refactor to new method scheme itroduced 07.02.2021 def run_update_star_schema_incremental(self): + self.ecdc_service_update.update_star_schema_incremental() return self # TODO: #116 implement EcdcService.run_update_star_schema_initial # TODO: #111 refactor to new method scheme itroduced 07.02.2021 def run_update_star_schema_initial(self): + self.ecdc_service_update.update_star_schema_initial() return self # TODO remove DEPRECATED @@ -69,7 +74,7 @@ class EcdcService: def download_DEPRECATED(self): app.logger.info(" download [begin]") app.logger.info("------------------------------------------------------------") - self.europe_service_download.download() + self.ecdc_service_download.download() app.logger.info(" download [done]") app.logger.info("------------------------------------------------------------") return self @@ -79,8 +84,8 @@ class EcdcService: def run_update_initial_DEPRECATED(self): app.logger.info(" run update [begin]") app.logger.info("------------------------------------------------------------") - self.europe_service_import.import_datafile_to_db() - self.europe_service_update.update_db_initial() + self.ecdc_service_import.import_datafile_to_db() + self.ecdc_service_update.update_db_initial() app.logger.info(" run update [done]") app.logger.info("------------------------------------------------------------") return self @@ -90,8 +95,8 @@ class EcdcService: def run_update_short_DEPRECATED(self): app.logger.info(" run update short [begin]") app.logger.info("------------------------------------------------------------") - self.europe_service_import.import_datafile_to_db() - self.europe_service_update.update_db_short() + self.ecdc_service_import.import_datafile_to_db() + self.ecdc_service_update.update_db_short() app.logger.info(" run update short [done]") app.logger.info("------------------------------------------------------------") return self diff --git a/src/covid19/blueprints/ecdc/ecdc_service_download.py b/src/covid19/blueprints/ecdc/ecdc_service_download.py index 50ffecb458df68605f62f7703861e58b61448473..ee198bebcf5177f0d1df30f4907195c553bc37a2 100644 --- a/src/covid19/blueprints/ecdc/ecdc_service_download.py +++ b/src/covid19/blueprints/ecdc/ecdc_service_download.py @@ -8,17 +8,17 @@ from covid19.blueprints.ecdc.ecdc_service_config import EcdcServiceConfig class EcdcServiceDownload: def __init__(self, database): app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service Download [init]") + app.logger.debug(" ECDC Service Download [init]") app.logger.debug("------------------------------------------------------------") self.__database = database self.cfg = EcdcServiceConfig() app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service Download [ready] ") + app.logger.debug(" ECDC Service Download [ready] ") def download(self): src_cvsfile_name = self.cfg.data_path+os.sep+self.cfg.cvsfile_name app.logger.info("------------------------------------------------------------") - app.logger.info(" download Europa [begin]") + app.logger.info(" download ECDC [begin]") app.logger.info("------------------------------------------------------------") app.logger.info(" FILE: "+self.cfg.cvsfile_name+" <- "+self.cfg.url_src_data) app.logger.info("------------------------------------------------------------") @@ -44,7 +44,7 @@ class EcdcServiceDownload: app.logger.info("############################################################") flash(message="error after downloading: " + src_cvsfile_name, category='error') finally: - app.logger.info(" download Europa [done]") + app.logger.info(" download ECDC [done]") msg = "downloaded: " + src_cvsfile_name + " " flash(msg) return self diff --git a/src/covid19/blueprints/ecdc/ecdc_service_import.py b/src/covid19/blueprints/ecdc/ecdc_service_import.py index ccd2b97c57cd2badfec6703e2679f43d08593e89..c07f06dd4bda814294477ba391cb1456add76154 100644 --- a/src/covid19/blueprints/ecdc/ecdc_service_import.py +++ b/src/covid19/blueprints/ecdc/ecdc_service_import.py @@ -10,19 +10,18 @@ from covid19.blueprints.ecdc.ecdc_service_config import EcdcServiceConfig class EcdcServiceImport: def __init__(self, database): app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service Import [init]") + app.logger.debug(" ECDC Service Import [init]") app.logger.debug("------------------------------------------------------------") self.__database = database self.cfg = EcdcServiceConfig() app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service Import [ready] ") + app.logger.debug(" ECDC Service Import [ready] ") def import_datafile_to_db(self): src_cvsfile_name = self.cfg.data_path+os.sep+self.cfg.cvsfile_name - app.logger.info(" import Europa [begin]") + app.logger.info(" import ECDC [begin]") app.logger.info("------------------------------------------------------------") - app.logger.info(" FILE: " + src_cvsfile_name) - app.logger.info(" TABLE: europe_data_import") + app.logger.info(" TABLE: ecdc_data_import <-- " + src_cvsfile_name) app.logger.info("------------------------------------------------------------") k = 0 try: @@ -47,20 +46,20 @@ class EcdcServiceImport: k = k + 1 if (k % 100) == 0: db.session.commit() - app.logger.info(" import Europa ... " + str(k) + " rows") + app.logger.info(" import ECDC ... " + str(k) + " rows") db.session.commit() - app.logger.info(" import Europa ... " + str(k) + " rows total") + app.logger.info(" import ECDC ... " + str(k) + " rows total") except KeyError as error: - app.logger.warning("KeyError: import Europa [begin]") + app.logger.warning("KeyError: import ECDC [begin]") app.logger.warning(":::" + str(error) + ":::") for item_key, item_value in row.items(): app.logger.warning(item_key + " : " + item_value) - app.logger.warning("KeyError: import Europa [end]") + app.logger.warning("KeyError: import ECDC [end]") except (Exception, psycopg2.DatabaseError) as error: - app.logger.warning("WARN: import Europa [begin]") + app.logger.warning("WARN: import ECDC [begin]") app.logger.warning(error) - app.logger.warning("WARN: import Europa [end]") + app.logger.warning("WARN: import ECDC [end]") finally: app.logger.info("------------------------------------------------------------") - app.logger.info(" import Europa [done]") + app.logger.info(" import ECDC [done]") return self diff --git a/src/covid19/blueprints/ecdc/ecdc_service_update.py b/src/covid19/blueprints/ecdc/ecdc_service_update.py index f86ab896cf320dd7199c116d4d3534950603ba49..6a6a604f56669e7c2cc69ef152637b5cbd1a1b74 100644 --- a/src/covid19/blueprints/ecdc/ecdc_service_update.py +++ b/src/covid19/blueprints/ecdc/ecdc_service_update.py @@ -8,12 +8,12 @@ from covid19.blueprints.ecdc.ecdc_model import EcdcDateReported, EcdcContinent, class EcdcServiceUpdate: def __init__(self, database): app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service Update [init]") + app.logger.debug(" ECDC Service Update [init]") app.logger.debug("------------------------------------------------------------") self.__database = database self.cfg = EcdcServiceConfig() app.logger.debug("------------------------------------------------------------") - app.logger.debug(" Europe Service Update [ready] ") + app.logger.debug(" ECDC Service Update [ready] ") def __update_date_reported(self): app.logger.info(" __update_date_reported [begin]") @@ -22,13 +22,13 @@ class EcdcServiceUpdate: k = 0 for result_item in result_date_rep: k += 1 - my_date_rep = result_item['date_rep'] - my_year_week = result_item['year_week'] + #my_date_rep = result_item['date_rep'] + my_date_rep = result_item[0] o = EcdcDateReported.create_new_object_factory( my_date_rep=my_date_rep ) db.session.add(o) - app.logger.info("| " + my_date_rep + " | " + my_year_week + " | " + str(k) + " rows ") + app.logger.info("| " + my_date_rep + " | " + str(k) + " rows ") db.session.commit() app.logger.info(" __update_date_reported [done]") app.logger.info("------------------------------------------------------------") @@ -39,7 +39,8 @@ class EcdcServiceUpdate: app.logger.info("------------------------------------------------------------") result_continent = EcdcImport.get_continent() for result_item in result_continent: - my_continent_exp = result_item['continent_exp'] + #my_continent_exp = result_item['continent_exp'] + my_continent_exp = result_item[0] o = EcdcContinent( region=my_continent_exp ) @@ -76,44 +77,44 @@ class EcdcServiceUpdate: result_date_rep = EcdcImport.get_date_rep() i = 0 for item_date_rep in result_date_rep: - europe_date_reported = EcdcDateReported.find_by_date_reported( + ecdc_date_reported = EcdcDateReported.find_by_date_reported( i_date_reported=item_date_rep['date_rep'] ) - if europe_date_reported is None: + if ecdc_date_reported is None: o = EcdcDateReported.create_new_object_factory(item_date_rep['date_rep']) - europe_date_reported = o - result_europe_data_import = EcdcImport.find_by_date_reported(europe_date_reported) - for item_europe_data_import in result_europe_data_import: - my_a = item_europe_data_import.countries_and_territories - my_b = item_europe_data_import.geo_id - my_c = item_europe_data_import.country_territory_code - europe_country = EcdcCountry.find_by( + ecdc_date_reported = o + result_ecdc_data_import = EcdcImport.find_by_date_reported(ecdc_date_reported) + for item_ecdc_data_import in result_ecdc_data_import: + my_a = item_ecdc_data_import.countries_and_territories + my_b = item_ecdc_data_import.geo_id + my_c = item_ecdc_data_import.country_territory_code + ecdc_country = EcdcCountry.find_by( countries_and_territories=my_a, geo_id=my_b, country_territory_code=my_c ) - my_d = int(item_europe_data_import.deaths_weekly) - my_e = int(item_europe_data_import.cases_weekly) - if item_europe_data_import.notification_rate_per_100000_population_14days == '': + my_d = int(item_ecdc_data_import.deaths_weekly) + my_e = int(item_ecdc_data_import.cases_weekly) + if item_ecdc_data_import.notification_rate_per_100000_population_14days == '': my_f = 0.0 else: - my_f = float(item_europe_data_import.notification_rate_per_100000_population_14days) + my_f = float(item_ecdc_data_import.notification_rate_per_100000_population_14days) o = EcdcData( - europe_country=europe_country, - europe_date_reported=europe_date_reported, + ecdc_country=ecdc_country, + ecdc_date_reported=ecdc_date_reported, deaths_weekly=my_d, cases_weekly=my_e, notification_rate_per_100000_population_14days=my_f ) db.session.add(o) - item_europe_data_import.row_imported = True - db.session.add(item_europe_data_import) + item_ecdc_data_import.row_imported = True + db.session.add(item_ecdc_data_import) i += 1 if i % 500 == 0: - app.logger.info(" update Europa initial ... " + str(i) + " rows") + app.logger.info(" update EDCD initial ... " + str(i) + " rows") db.session.commit() db.session.commit() - app.logger.info(" update Europa initial ... " + str(i) + " rows total") + app.logger.info(" update ECDC initial ... " + str(i) + " rows total") app.logger.info(" __update_data_initial [done]") app.logger.info("------------------------------------------------------------") return self @@ -162,24 +163,50 @@ class EcdcServiceUpdate: def update_dimension_tables_only(self): # TODO: #118 implement EcdcServiceUpdate.update_dimension_tables_only # TODO: #117 refactor EcdcServiceUpdate to new method scheme introduced 07.02.2021 + EcdcData.remove_all() + EcdcCountry.remove_all() + EcdcContinent.remove_all() + self.__update_date_reported() + self.__update_continent() + self.__update_country() return self def update_fact_table_incremental_only(self): # TODO: #119 implement EcdcServiceUpdate.update_fact_table_incremental_only # TODO: #117 refactor EcdcServiceUpdate to new method scheme introduced 07.02.2021 + EcdcDateReported.remove_all() + self.__update_data_short() return self def update_fact_table_initial_only(self): # TODO: #120 implement EcdcServiceUpdate.update_fact_table_initial_only # TODO: #117 refactor EcdcServiceUpdate to new method scheme introduced 07.02.2021 + EcdcDateReported.remove_all() + self.__update_data_initial() return self def update_star_schema_incremental(self): # TODO: #121 implement EcdcServiceUpdate.update_star_schema_incremental # TODO: #117 refactor EcdcServiceUpdate to new method scheme introduced 07.02.2021 + EcdcData.remove_all() + EcdcCountry.remove_all() + EcdcContinent.remove_all() + EcdcDateReported.remove_all() + self.__update_date_reported() + self.__update_continent() + self.__update_country() + self.__update_data_short() return self def update_star_schema_initial(self): # TODO: #122 implement EcdcServiceUpdate.update_star_schema_initial # TODO: #117 refactor EcdcServiceUpdate to new method scheme introduced 07.02.2021 + EcdcData.remove_all() + EcdcCountry.remove_all() + EcdcContinent.remove_all() + EcdcDateReported.remove_all() + self.__update_date_reported() + self.__update_continent() + self.__update_country() + self.__update_data_initial() return self diff --git a/src/covid19/blueprints/ecdc/ecdc_views.py b/src/covid19/blueprints/ecdc/ecdc_views.py index 43cf9d205a04356a650b5ca7a97c4e3cf3e91842..535dd80f5a403064fe4525b19958010f0986e802 100644 --- a/src/covid19/blueprints/ecdc/ecdc_views.py +++ b/src/covid19/blueprints/ecdc/ecdc_views.py @@ -56,7 +56,7 @@ def task_ecdc_download_only(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_download_only [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_download_only() # TODO + ecdc_service.run_download_only() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_download_only)" return result @@ -69,7 +69,7 @@ def task_ecdc_import_only(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_import_only [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_import_only() # TODO + ecdc_service.run_import_only() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_import_only)" return result @@ -82,7 +82,7 @@ def task_ecdc_update_dimension_tables_only(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_update_dimension_tables_only [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_update_dimension_tables_only() # TODO + ecdc_service.run_update_dimension_tables_only() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_update_dimension_tables_only)" return result @@ -95,22 +95,22 @@ def task_ecdc_update_fact_table_incremental_only(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_update_fact_table_incremental_only [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_update_fact_table_incremental_only() # TODO + ecdc_service.run_update_fact_table_incremental_only() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_update_fact_table_incremental_only)" return result @celery.task(bind=True) -def task_ecdc_update_fact_table_initial_only(self): +def task_ecdc_update_fact_table_incremental_only(self): logger = get_task_logger(__name__) self.update_state(state=states.STARTED) logger.info("------------------------------------------------------------") - logger.info(" Received: task_ecdc_update_fact_table_initial_only [OK] ") + logger.info(" Received: task_ecdc_update_fact_table_incremental_only [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_update_fact_table_initial_only() # TODO + ecdc_service.run_update_fact_table_incremental_only() self.update_state(state=states.SUCCESS) - result = "OK (task_ecdc_update_fact_table_initial_only)" + result = "OK (task_ecdc_update_fact_table_incremental_only)" return result @@ -121,7 +121,7 @@ def task_ecdc_update_fact_table_initial_only(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_update_fact_table_initial_only [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_update_fact_table_initial_only() # TODO + ecdc_service.run_update_fact_table_initial_only() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_update_fact_table_initial_only)" return result @@ -134,7 +134,7 @@ def task_ecdc_update_star_schema_incremental(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_update_star_schema_incremental [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_update_star_schema_incremental() # TODO + ecdc_service.run_update_star_schema_incremental() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_update_star_schema_incremental)" return result @@ -147,7 +147,7 @@ def task_ecdc_update_star_schema_initial(self): logger.info("------------------------------------------------------------") logger.info(" Received: task_ecdc_update_star_schema_initial [OK] ") logger.info("------------------------------------------------------------") - ecdc_service.task_update_star_schema_initial() # TODO + ecdc_service.run_update_star_schema_initial() self.update_state(state=states.SUCCESS) result = "OK (task_ecdc_update_star_schema_initial)" return result @@ -320,6 +320,8 @@ def url_ecdc_task_update_data_short_DEPRECATED(): @app_ecdc.route('/task/update/star_schema/initial') def url_ecdc_task_update_star_schema_initial(): flash("url_ecdc_task_update_star_schema_initial started") + ecdc_service.run_download_only() + task_ecdc_update_star_schema_initial.apply_async() return redirect(url_for('ecdc.url_ecdc_tasks')) @@ -327,6 +329,8 @@ def url_ecdc_task_update_star_schema_initial(): @app_ecdc.route('/task/update/star_schema/incremental') def url_ecdc_task_update_starschema_incremental(): flash("url_ecdc_task_update_starschema_incremental started") + ecdc_service.run_download_only() + task_ecdc_update_star_schema_incremental.apply_async() return redirect(url_for('ecdc.url_ecdc_tasks')) @@ -334,6 +338,7 @@ def url_ecdc_task_update_starschema_incremental(): @app_ecdc.route('/task/download/only') def url_ecdc_task_download_only(): flash("url_ecdc_task_download_only started") + ecdc_service.run_download_only() return redirect(url_for('ecdc.url_ecdc_tasks')) @@ -341,6 +346,7 @@ def url_ecdc_task_download_only(): @app_ecdc.route('/task/import/only') def url_ecdc_task_import_only(): flash("url_ecdc_task_import_only started") + task_ecdc_import_only.apply_async() return redirect(url_for('ecdc.url_ecdc_tasks')) @@ -348,6 +354,7 @@ def url_ecdc_task_import_only(): @app_ecdc.route('/task/update/dimension-tables/only') def url_ecdc_task_update_dimensiontables_only(): flash("url_ecdc_task_update_dimensiontables_only started") + task_ecdc_update_dimension_tables_only.apply_async() return redirect(url_for('ecdc.url_ecdc_tasks')) @@ -355,6 +362,7 @@ def url_ecdc_task_update_dimensiontables_only(): @app_ecdc.route('/task/update/fact-table/incremental/only') def url_ecdc_task_update_facttable_incremental_only(): flash("url_ecdc_task_update_facttable_incremental_only started") + task_ecdc_update_fact_table_incremental_only.apply_async() return redirect(url_for('ecdc.url_ecdc_tasks')) @@ -362,4 +370,5 @@ def url_ecdc_task_update_facttable_incremental_only(): @app_ecdc.route('/task/update/fact-table/initial/only') def url_ecdc_task_update_facttable_initial_only(): flash("url_ecdc_task_update_facttable_initial_only started") + task_ecdc_update_fact_table_initial_only.apply_async() return redirect(url_for('ecdc.url_ecdc_tasks'))