Skip to content
Snippets Groups Projects
Commit cc7ad204 authored by thomaswoehlke's avatar thomaswoehlke
Browse files

dev for Milestone 0.0.14

parent 596009a7
No related branches found
No related tags found
2 merge requests!161Issue 66 attempt 01,!160Issue 66 attempt 01
from database import db, app
from covid19.blueprints.rki.rki_model import RkiRegion, RkiDateReported, RkiCountry, RkiGermanyData
from covid19.blueprints.rki.rki_model_import import RkiGermanyDataImportTable
from covid19.blueprints.rki.rki_model_import import RkiBundeslaenderImport
rki_service_update = None
......@@ -21,7 +21,7 @@ class RkiServiceUpdate:
app.logger.info(" update who_date_reported [begin]")
app.logger.info("------------------------------------------------------------")
i = 0
for i_date_reported, in RkiGermanyDataImportTable.get_dates_reported():
for i_date_reported, in RkiBundeslaenderImport.get_dates_reported():
c = RkiDateReported.find_by_date_reported(i_date_reported)
if c is None:
o = RkiDateReported.create_new_object_factory(my_date_rep=i_date_reported)
......
......@@ -16,45 +16,66 @@ class WhoService:
app.logger.debug("------------------------------------------------------------")
app.logger.info(" WHO Service [ready]")
def run_download(self):
app.logger.info(" run update [begin]")
def run_download_only(self):
app.logger.info(" run_download_only [begin]")
app.logger.info("------------------------------------------------------------")
success = self.who_service_download.download_file()
app.logger.info("")
app.logger.info(" run update [done]")
app.logger.info(" run_download_only [done]")
app.logger.info("------------------------------------------------------------")
return success
def run_update(self, import_file=True):
app.logger.info(" run update [begin]")
def run_import_only(self):
app.logger.info(" run_import_only [begin]")
app.logger.info("------------------------------------------------------------")
if import_file:
self.who_service_import.import_file()
self.who_service_update.update_db()
self.who_service_import.import_file()
app.logger.info("")
app.logger.info(" run update [done]")
app.logger.info(" run_import_only [done]")
app.logger.info("------------------------------------------------------------")
return self
def run_update_short(self, import_file=True):
app.logger.info(" run update short [begin]")
def run_update_dimension_tables_only(self):
app.logger.info(" run_update_only [begin]")
app.logger.info("------------------------------------------------------------")
if import_file:
self.who_service_import.import_file()
self.who_service_update.update_db_short()
self.who_service_update.update_dimension_tables_only()
app.logger.info("")
app.logger.info(" run update short [done]")
app.logger.info(" run_update_only [done]")
app.logger.info("------------------------------------------------------------")
return self
def run_update_initial(self, import_file=True):
app.logger.info(" run update initial [begin]")
def run_update_fact_table_incremental_only(self):
app.logger.info(" run_update_only [begin]")
app.logger.info("------------------------------------------------------------")
if import_file:
self.who_service_import.import_file()
self.who_service_update.update_db_initial()
self.who_service_update.update_fact_table_incremental_only()
app.logger.info("")
app.logger.info(" run update initial [done]")
app.logger.info(" run_update_only [done]")
app.logger.info("------------------------------------------------------------")
return self
def run_update_fact_table_initial_only(self):
app.logger.info(" run_update_initial [begin]")
app.logger.info("------------------------------------------------------------")
self.who_service_update.update_fact_table_initial_only()
app.logger.info("")
app.logger.info(" run_update_initial [done]")
app.logger.info("------------------------------------------------------------")
return self
def run_update_star_schema_incremental(self):
app.logger.info(" run_update_short [begin]")
app.logger.info("------------------------------------------------------------")
self.who_service_update.update_star_schema_incremental()
app.logger.info("")
app.logger.info(" run_update_short [done]")
app.logger.info("------------------------------------------------------------")
return self
def run_update_star_schema_initial(self):
app.logger.info(" run_update_initial_full [begin]")
app.logger.info("------------------------------------------------------------")
self.who_service_update.update_star_schema_initial()
app.logger.info("")
app.logger.info(" run_update_initial_full [done]")
app.logger.info("------------------------------------------------------------")
return self
......@@ -15,7 +15,7 @@ class WhoServiceUpdate:
app.logger.debug(" WHO Service Update [ready]")
def __update_who_date_reported(self):
app.logger.info(" update who_date_reported [begin]")
app.logger.info(" __update_who_date_reported [begin]")
app.logger.info("------------------------------------------------------------")
for i_date_reported, in WhoGlobalDataImportTable.get_dates_reported():
c = WhoDateReported.find_by_date_reported(i_date_reported)
......@@ -27,12 +27,12 @@ class WhoServiceUpdate:
else:
app.logger.info(" update who_date_reported "+i_date_reported+" NOT added "+str(c.id))
app.logger.info("")
app.logger.info(" update who_date_reported [done]")
app.logger.info(" __update_who_date_reported [done]")
app.logger.info("------------------------------------------------------------")
return self
def __update_who_region(self):
app.logger.info(" update who_region [begin]")
app.logger.info(" __update_who_region [begin]")
app.logger.info("------------------------------------------------------------")
for i_who_region, in WhoGlobalDataImportTable.get_regions():
c = WhoRegion.find_by_region(i_who_region)
......@@ -44,12 +44,12 @@ class WhoServiceUpdate:
else:
app.logger.info(i_who_region + " NOT added "+str(c.id))
app.logger.info("")
app.logger.info(" update who_region [done]")
app.logger.info(" __update_who_region [done]")
app.logger.info("------------------------------------------------------------")
return self
def __update_who_country(self):
app.logger.info(" update who_country [begin]")
app.logger.info(" __update_who_country [begin]")
app.logger.info("------------------------------------------------------------")
result = WhoGlobalDataImportTable.countries()
for result_item in result:
......@@ -78,48 +78,12 @@ class WhoServiceUpdate:
app.logger.info(output)
db.session.commit()
app.logger.info("")
app.logger.info(" update who_country [done]")
app.logger.info(" __update_who_country [done]")
app.logger.info("------------------------------------------------------------")
return self
def __update_who_global_data(self):
app.logger.info(" update WHO [begin]")
app.logger.info("------------------------------------------------------------")
#dates_reported = WhoDateReported.get_all_as_dict()
#countries = WhoCountry.get_all_as_dict()
i = 0
result = WhoGlobalDataImportTable.get_all()
for result_item in result:
if result_item.country_code != "":
my_country = WhoCountry.find_by_country_code(result_item.country_code) #countries[result_item.country_code]
else:
my_country = WhoCountry.find_by_country(result_item.country)
my_date_reported = WhoDateReported.find_by_date_reported(result_item.date_reported) #dates_reported[result_item.date_reported]
result_who_global_data = WhoGlobalData.find_one_or_none_by_date_and_country(
my_date_reported,
my_country)
if result_who_global_data is None:
o = WhoGlobalData(
cases_new=int(result_item.new_cases),
cases_cumulative=int(result_item.cumulative_cases),
deaths_new=int(result_item.new_deaths),
deaths_cumulative=int(result_item.cumulative_deaths),
date_reported=my_date_reported,
country=my_country
)
db.session.add(o)
i += 1
if i % 2000 == 0:
app.logger.info(" update WHO ... "+str(i)+" rows")
db.session.commit()
db.session.commit()
app.logger.info(" update WHO : "+str(i)+" rows total")
app.logger.info(" update WHO [done]")
app.logger.info("------------------------------------------------------------")
return self
def __update_who_global_data_short(self):
app.logger.info(" update WHO short [begin]")
def __update_fact_table_incremental(self):
app.logger.info(" __update_fact_tables_incremental [begin]")
app.logger.info("------------------------------------------------------------")
new_dates_reported_from_import = WhoGlobalDataImportTable.get_new_dates_as_array()
i = 0
......@@ -150,12 +114,12 @@ class WhoServiceUpdate:
db.session.commit()
app.logger.info(" update WHO short ... " + str(i) + " rows ["+ str(my_date) +"]")
app.logger.info(" update WHO short : "+str(i)+" rows total")
app.logger.info(" update WHO short [done]")
app.logger.info(" __update_fact_tables_incremental [done]")
app.logger.info("------------------------------------------------------------")
return self
def __update_who_global_data_initial(self):
app.logger.info(" update WHO initial [begin]")
def __update_fact_table_initial(self):
app.logger.info(" __update_fact_table_initial [begin]")
app.logger.info("------------------------------------------------------------")
WhoGlobalData.remove_all()
new_dates_reported_from_import = WhoGlobalDataImportTable.get_new_dates_as_array()
......@@ -185,39 +149,54 @@ class WhoServiceUpdate:
db.session.commit()
db.session.commit()
app.logger.info(" update WHO initial : "+str(i)+" total rows")
app.logger.info(" update WHO initial [done]")
app.logger.info(" __update_fact_table_initial [done]")
app.logger.info("------------------------------------------------------------")
return self
def update_db(self):
app.logger.info(" update db [begin]")
app.logger.info("------------------------------------------------------------")
def __update_dimension_tables(self):
self.__update_who_date_reported()
self.__update_who_region()
self.__update_who_country()
self.__update_who_global_data()
app.logger.info(" update db [done]")
return self
def update_dimension_tables_only(self):
app.logger.info(" update_dimension_tables_only [begin]")
app.logger.info("------------------------------------------------------------")
self.__update_dimension_tables()
app.logger.info(" update_dimension_tables_only [done]")
app.logger.info("------------------------------------------------------------")
return self
def update_db_short(self):
app.logger.info(" update db short [begin]")
def update_fact_table_incremental_only(self):
app.logger.info(" update_fact_tables_incremental_only [begin]")
app.logger.info("------------------------------------------------------------")
self.__update_who_date_reported()
self.__update_who_region()
self.__update_who_country()
self.__update_who_global_data_short()
app.logger.info(" update db short [done]")
self.__update_fact_table_incremental()
app.logger.info(" update_fact_tables_incremental_only [done]")
app.logger.info("------------------------------------------------------------")
return self
def update_db_initial(self):
app.logger.info(" update db initial [begin]")
def update_fact_table_initial_only(self):
app.logger.info(" update_fact_tables_initial_only [begin]")
app.logger.info("------------------------------------------------------------")
self.__update_who_date_reported()
self.__update_who_region()
self.__update_who_country()
self.__update_who_global_data_initial()
app.logger.info(" update db initial [done]")
self.__update_fact_table_initial()
app.logger.info(" update_fact_tables_initial_only [done]")
app.logger.info("------------------------------------------------------------")
return self
def update_star_schema_incremental(self):
app.logger.info(" update_star_schema_incremental [begin]")
app.logger.info("------------------------------------------------------------")
self.__update_dimension_tables()
self.__update_fact_table_incremental()
app.logger.info(" update_star_schema_incremental [done]")
app.logger.info("------------------------------------------------------------")
return self
def update_star_schema_initial(self):
app.logger.info(" update_star_schema_initial [begin]")
app.logger.info("------------------------------------------------------------")
self.__update_dimension_tables()
self.__update_fact_table_initial()
app.logger.info(" update_star_schema_initial [done]")
app.logger.info("------------------------------------------------------------")
return self
......@@ -23,11 +23,11 @@ app_who = Blueprint('who', __name__, template_folder='templates')
@celery.task(bind=True)
def task_who_run_update(self, import_file=True):
def task_who_run_update_full(self, import_file=True):
self.update_state(state=states.STARTED)
logger = get_task_logger(__name__)
logger.info("------------------------------------------------------------")
logger.info(" Received: task_who_run_update [OK] ")
logger.info(" Received: task_who_run_update_full [OK] ")
logger.info("------------------------------------------------------------")
who_service.run_update(import_file)
self.update_state(state=states.SUCCESS)
......@@ -61,6 +61,32 @@ def task_who_update_initial(self):
return result
@celery.task(bind=True)
def task_who_import_only(self):
logger = get_task_logger(__name__)
self.update_state(state=states.STARTED)
logger.info("------------------------------------------------------------")
logger.info(" Received: task_who_import_only [OK] ")
logger.info("------------------------------------------------------------")
who_service.who_service_import.import_file()
self.update_state(state=states.SUCCESS)
result = "OK (task_who_import_only)"
return result
@celery.task(bind=True)
def task_who_update_only(self):
logger = get_task_logger(__name__)
self.update_state(state=states.STARTED)
logger.info("------------------------------------------------------------")
logger.info(" Received: task_who_update_only [OK] ")
logger.info("------------------------------------------------------------")
who_service.who_service_update.update_only_db()
self.update_state(state=states.SUCCESS)
result = "OK (task_who_update_only)"
return result
@app_who.route('/info')
def url_who_info():
page_info = ApplicationPage('WHO', "Info")
......@@ -354,30 +380,64 @@ def url_who_germany(page=1):
page_info=page_info)
@app_who.route('/update')
def url_who_update_run():
app.logger.info("url_who_update_run [start]")
@app_who.route('/task/download/only')
def url_who_task_download_only():
app.logger.info("url_who_task_download_only [start]")
who_service.who_service_download.download_file()
task_who_run_update.apply_async()
flash("who_service.who_service_download.download_file ok")
flash(message="long running background task started", category="warning")
app.logger.info("url_who_task_download_only [done]")
return redirect(url_for('url_who_tasks'))
@app_who.route('/task/import/only')
def url_who_task_import_only():
app.logger.info("url_who_update_run [start]")
task_who_import_only.apply_async()
flash("who_service.run_update started")
flash(message="long running background task started", category="warning")
app.logger.info("url_who_update_run [done]")
return redirect(url_for('url_home'))
return redirect(url_for('url_who_tasks'))
@app_who.route('/update/short')
def url_who_update_short_run():
@app_who.route('/task/update/only')
def url_who_task_update_only():
app.logger.info("url_who_update_run [start]")
task_who_update_only.apply_async()
flash("task_who_update_only started")
flash(message="long running background task started", category="warning")
app.logger.info("url_who_update_run [done]")
return redirect(url_for('url_who_tasks'))
@app_who.route('/task/update/initial')
def url_who_task_update_initial():
who_service.who_service_download.download_file()
flash("who_service.who_service_download.download_file ok")
task_who_update_initial.apply_async()
flash("task_who_update_initial started")
flash(message="long running background task started", category="warning")
return redirect(url_for('url_who_tasks'))
@app_who.route('/task/update/short')
def url_who_task_update_short():
who_service.who_service_download.download_file()
flash("who_service.who_service_download.download_file ok")
task_who_update_short.apply_async()
flash("who_service.run_update_short started")
flash("task_who_update_short started")
flash(message="long running background task started", category="warning")
return redirect(url_for('url_home'))
return redirect(url_for('url_who_tasks'))
@app_who.route('/update/initial')
def url_who_update_initial_run():
@app_who.route('/task/update/full')
def url_who_task_update_full():
app.logger.info("url_who_task_update_full [start]")
who_service.who_service_download.download_file()
task_who_update_initial.apply_async()
flash("who_service.run_update_short started")
flash("who_service.who_service_download.download_file ok")
task_who_run_update_full.apply_async()
flash("task_who_run_update_full started")
flash(message="long running background task started", category="warning")
return redirect(url_for('url_home'))
app.logger.info("url_who_task_update_full [done]")
return redirect(url_for('url_who_tasks'))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment