import db import re from sqlalchemy import select, desc from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker import bs4 from urllib.request import urlopen from urllib.parse import urljoin import logging from argparse import ArgumentParser def parse_ingredient(ingredient_text): units = ['teaspoon', 'tablespoon', 'gram', 'once', 'jar', 'cup', 'pinch'] number_regex = '((?:[\d\\u00BC-\\u00BE\\u2150-\\u215E]*\s?(?:\(.+\))?)*)' ingredient_regex = '([a-zA-Z \'\-]+)' supplement_regex = ',?(.*)' units_regex = "|".join([f'[{unit[0]}{unit[0].capitalize()}]{unit[1:]}' for unit in units]) units_regex = f"((?:{units_regex})[s]?)" regex = re.compile(number_regex + units_regex + ingredient_regex + supplement_regex) m = regex.match(ingredient_text) logging.info(f"Parsed {ingredient_text}, found: {m}") if not m: return None return [text.strip() for text in m.groups()] def load_recipe(recipe_url): try: logging.info(f'Loading Recipe: {recipe_url}') with urlopen(recipe_url) as f: if f.getcode() == 404: raise Exception(f"Recipe Does not exist: {recipe_url}") return bs4.BeautifulSoup(f.read().decode(), 'html.parser') except Exception as e: logging.warning(f"Could not download or parse recipe: {recipe_url}") logging.warning(e) return None def parse_recipe(session, recipe, site): recipe_url = urljoin(site.base_url, str(recipe.identifier)) recipe_page = load_recipe(recipe_url) if not recipe_page: return None name_candidates = recipe_page.find_all(class_=site.name_class) if len(name_candidates) == 0: raise Exception(f"Could not extract recipe name: {recipe_url}") name_div = name_candidates[0] recipe.name = name_div.text logging.info(f"Adding Recipe {recipe}") session.add(recipe) session.flush() ingred_candidates = recipe_page.find_all(class_=site.ingredient_class) for candidate in ingred_candidates: ingred = db.RecipeIngredient(text=candidate.text, recipe_id=recipe.id) session.add(ingred) session.flush() parts = parse_ingredient(ingred.text) if parts: quantity, unit, ingredient, supplement = parts ingred_parts = db.RecipeIngredientParts(id = ingred.id, quantity = quantity, unit = unit, ingredient = ingredient, supplement = supplement) session.add(ingred_parts) logging.info(f"{len(ingred_candidates)} ingredients found. Inserting into DB") return recipe parser = ArgumentParser(description="Scrape a recipe site for recipies") parser.add_argument('site', help='Name of site') parser.add_argument('-id', '--identifier', dest='id', help='url of recipe(reletive to base url of site) or commma seperated list') parser.add_argument('-a', '--auto', action='store', dest='n', help='automaticaly generate identifier(must supply number of recipies to scrape)') parser.add_argument('-v', '--verbose', action='store_true') args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.INFO) logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) eng = db.get_engine() S = sessionmaker(eng) with S.begin() as sess: site = sess.query(db.RecipeSite).where(db.RecipeSite.name == args.site).one() site_id = site.id recipe_ids = [] starting_id = 0 if args.id and not args.n: recipe_ids.append(args.id) logging.info(f'Retreiving single recipe: {args.id}') elif args.n: if not args.id: last_recipe = sess.query(db.Recipe).\ where(db.Recipe.recipe_site_id == site.id).\ order_by(desc(db.Recipe.identifier)).\ limit(1).\ scalar() starting_id = int(last_recipe.identifier) + 1 else: starting_id = int(args.id) recipe_ids = range(starting_id, starting_id+int(args.n)) logging.info(f'Retreving {args.n} recipes from {site.base_url} starting at {starting_id}') for recipe_id in recipe_ids: try: savepoint = sess.begin_nested() recipe = db.Recipe(identifier = recipe_id, recipe_site_id = site.id) parse_recipe(sess, recipe, site) savepoint.commit() except KeyboardInterrupt as e: savepoint.rollback() break except Exception as e: savepoint.rollback() logging.error(e) break