User:Checkallthestrings bot/Task 7

From Wikidata
Jump to navigation Jump to search
import pywikibot
import csv
from pywikibot import pagegenerators as pg
import time

inp_file = "bacdive.csv"
out_file = "bacdive_out.csv"
sparql_query = "SELECT ?item WHERE {{?item wdt:P225 '{0}' .}}"
wikidata_site = pywikibot.Site("wikidata", "wikidata")
bacdive_property = "P2946"
bacdive_item = "Q25349390"

def check_bacdive_statement(item_dict):
    try:
        bacdive_id = item_dict['claims'][bacdive_property][0]
    except:
        bacdive_id = ""

    return bacdive_id

def create_source(site, claim):
    source_target = pywikibot.ItemPage(site, bacdive_item)
    source_claim = pywikibot.Claim(site, "P248", isReference=True)
    source_claim.setTarget(source_target)
    source_time = pywikibot.Claim(site, "P813", isReference=True)
    time_value = pywikibot.WbTime(year=2016, month=7, day=1)
    source_time.setTarget(time_value)
    claim.addSources([source_claim, source_time])

def create_bacdive_statement(site, item, bacdive_id):
    claim = pywikibot.Claim(site, bacdive_property)
    claim.setTarget(bacdive_id)
    item.addClaim(claim, summary="Setting Bacdive ID P2946")
    return claim

def create_item(site, label):
    new_item = pywikibot.ItemPage(site)
    new_labels = {"en": label, "de": label}
    new_item.editLabels(labels=new_labels, summary="Setting taxon name as label")
    return new_item.getID()

def create_taxo_statments(site, item, label):
    claim = pywikibot.Claim(site, "P225")
    claim.setTarget(label)
    item.addClaim(claim, summary="Set taxon name P225")
    return claim

def create_new_item_statements(new_item_id, wikidata_site, label, bacdive_id):
    new_item = pywikibot.ItemPage(wikidata_site, new_item_id)
    claim = create_taxo_statments(wikidata_site, new_item, label)
    create_source(wikidata_site, claim)
    claim = create_bacdive_statement(wikidata_site, new_item, bacdive_id)
    create_source(wikidata_site, claim)

with open(inp_file) as inp_f:
    reader = csv.DictReader(inp_f)

    for row in reader:
        print("==============================")
        species_exists = False
        label = row['species'].rstrip()
        bacdive_id = row['BacDive_ID'].rstrip()

        custom_query = sparql_query.format(label)
        generator = pg.WikidataSPARQLPageGenerator(custom_query, site=wikidata_site)

        for item in generator:
            item_dict = item.get()
            species_exists = True
            # Check if statement exists.
            existing_id = check_bacdive_statement(item_dict)

            # If it is missing create the statement
            if existing_id == "":
                claim = create_bacdive_statement(wikidata_site, item, bacdive_id)
                create_source(wikidata_site, claim)

        if species_exists == False:
            print('Creating item')
            new_item_id = create_item(wikidata_site, label)
            time.sleep(5)
            create_new_item_statements(new_item_id, wikidata_site, label, bacdive_id)