Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
jupyter-naas
GitHub Repository: jupyter-naas/awesome-notebooks
Path: blob/master/Agicap/Agicap_Get_transactions_by_account.ipynb
2973 views
Kernel: Python 3

Agicap.png

Agicap - Get transactions by account

Give Feedback | Bug report

Tags: #agicap #forecast #company #data #python

Last update: 2023-09-18 (Created: 2023-09-18)

Description: This notebook is designed to retrieve all transactions for a specified company and account from Agicap. It will then organize this data into a structured DataFrame for easy analysis. The DataFrame returned contains the following columns:

  • 'ENTREPRISE_ID': This column represents the unique identifier of the company.

  • 'COMPTE_ID': This column indicates the specific account ID related to the transaction.

  • 'TRANSACTION_ID': This column holds the unique transaction ID.

  • 'TRANSACTION_NAME': This column contains the name or description of the transaction.

  • 'CATEGORY_ID': This column represents the unique identifier of the transaction category.

  • 'CATEGORY_NAME': This column contains the name of the transaction category.

  • 'PROJECTS': This column is intended for any project-related information linked with the transaction.

  • 'CURRENCY': This column indicates the currency in which the transaction was made.

  • 'DATE_ORDER': This column holds the order date of the transaction in Unix timestamp format.

  • 'DATE': This column contains the date of the transaction in 'DD/MM/YYYY' format.

  • 'VALUE': This column represents the monetary value of the transaction.

Input

Import libraries

import requests import naas import pandas as pd import json

Setup variables

Mandatory

  • username: Agicap username

  • password: Agicap password

  • enterprise_id: Agicap enterprise ID. Your Agicap account manager can provide you all your enterprises/accounts ids.

  • account_id: Agicap enterprise ID. Your Agicap account manager can provide you all your enterprises/accounts ids.

Optional

  • output_csv: csv file path to be saved as output

# Mandatory username = naas.secret.get('AGICAP_USERNAME') or "<username>" password = naas.secret.get('AGICAP_PASSWORD') or "<password>" enterprise_id = "00001" account_id = "00001" # Optional output_csv = f"Transactions_{enterprise_id}_{account_id}.csv"

Model

Get token from Agicap

Get token using user credentials

def get_token( username=None, password=None, force_update=False, ): # Get credentials if not username: username = naas.secret.get('AGICAP_USERNAME') if not password: password = naas.secret.get('AGICAP_PASSWORD') # Check if token exists token = naas.secret.get('AGICAP_TOKEN') if token and not force_update: return token # Sign in to get token url = "https://business-definition.agicap.com/signin" headers = { "Accept": "application/json, text/plain, */*", "Content-Type": "application/json" } payload = { "Username": username, "Password": password } res = requests.post(url, headers=headers, json=payload) res.raise_for_status # Get agicap token if len(res.json()) > 0: token = res.json().get("token") if token != naas.secret.get('AGICAP_TOKEN'): naas.secret.add('AGICAP_TOKEN', token) else: print('Error while connecting to AGICAP!') return token token = get_token(username, password)

Get transactions

Get all transactions using API

def get_export(token, company_id, url, payload): # Headers headers = { "Accept": "application/json, text/plain, */*", "Accept-Language": "fr", "Authorization": f"Bearer {token}", "EntrepriseId": str(company_id), "Content-Type": "application/json" } # Request res = requests.post(url, headers=headers, json=payload) return res # Flatten the nested dict def flatten_dict(d, parent_key='', sep='_'): """ Flattens a nested dictionary into a single level dictionary. Args: d (dict): A nested dictionary. parent_key (str): Optional string to prefix the keys with. sep (str): Optional separator to use between parent_key and child_key. Returns: dict: A flattened dictionary. """ items = [] for k, v in d.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): items.extend(flatten_dict(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) def get_transactions( company_id, account_id, token=None, company_name=None, account_name=None, ): # Init data = [] skip = 0 take = 100 # Get token if not token: token = naas.secret.get('AGICAP_TOKEN') # Payload payload = { "filters": { "nature": 1, "statusesToInclude": 0, "statusesToExclude": 0, "transactionWithProjectsDistributionStale": False, "includeTransactionWithoutProject": False, "transactionsType": 0, "categorizationState": 0, "bankAccountIds": [str(account_id)] }, "sort":{"asc": False, "sortField": "0"}, "pagination": {"skip": skip,"take": take} } # URL url = "https://app.agicap.com/api/paidtransaction/GetByFilters" # Loop to update token if needed while True: # Requests result = [] res = get_export(token, str(company_id), url, payload) # Manage result if res.status_code == 200: message = f"{res.status_code} - Export successfull" result = res.json().get("Result") else: print("❌ Error while getting:", url) message = f"{res.status_code} - {res.text}" print(message) # Parse result to flaten df for r in result: data.append(flatten_dict(r)) # Break if result empty if len(result) == 0: print(f"No result found (skip={skip})") break elif len(result) < take: break else: skip += take payload["pagination"]["skip"] = skip # Create df df = pd.DataFrame(data).reset_index(drop=True) # Enrich transactions if len(df) > 0: i = 0 df.columns = df.columns.str.upper() # Insert company info if company_name: df.insert(loc=i, column="ENTREPRISE", value=company_name) i += 1 df.insert(loc=i, column="ENTREPRISE_ID", value=company_id) i += 1 # Insert account info if account_name: df.insert(loc=i, column="COMPTE", value=account_name) i += 1 df.insert(loc=i, column="COMPTE_ID", value=account_id) return df, message df, message = get_transactions( enterprise_id, account_id, token ) print("Rows:", len(df)) df.head(1)

Prep data

def get_projects_dict(df): # Init output = {} projects = df["PROJECTS"].astype(str).unique().tolist() # Loop on unique value for p in projects: if p != '[]': p = p.replace("'", '"') # replace single quotes with double quotes to make it a valid JSON string data = json.loads(p) # parse the string into a Python object title = data[0]['Title'] # get the "Title" output[p] = title else: output[p] = "" return output def create_database(df_init): # Init df = df_init.copy() # Keep columns to_keep = [ 'ENTREPRISE_ID', 'COMPTE_ID', 'ID', 'NAME', 'CATEGORY_ID', 'CATEGORY_NAME', 'PAYMENTDATE', 'PROJECTS', 'ISCASHINFLOW', 'AMOUNT_VALUE', 'AMOUNT_CURRENCY', ] df = df[to_keep] # Rename columns to_rename = { "ID": "TRANSACTION_ID", "NAME": "TRANSACTION_NAME", "AMOUNT_CURRENCY": "CURRENCY" } df = df.rename(columns=to_rename) # Get Projet name projects = get_projects_dict(df) df["PROJECTS"] = df["PROJECTS"].astype(str).map(projects) # Transform payment date df["DATE_ORDER"] = df["PAYMENTDATE"].astype(int) / 1000 df["PAYMENTDATE"] = pd.to_datetime((df["PAYMENTDATE"].astype(int) / 1000), unit="s").dt.tz_localize('UTC').dt.tz_convert('Europe/Paris') df["DATE"] = df["PAYMENTDATE"].dt.strftime("%d/%m/%Y") # Create VALUE column with sign df["VALUE"] = df.apply(lambda row: row["AMOUNT_VALUE"] * (-1) if not row["ISCASHINFLOW"] else row["AMOUNT_VALUE"], axis=1) # Drop useless columns to_drop = [ "PAYMENTDATE", "ISCASHINFLOW", "AMOUNT_VALUE" ] df = df.drop(to_drop, axis=1) return df.reset_index(drop=True) df_output = create_database(df) print(f"DB: {len(df_output)} rows, {len(df_output.columns)} columns") df_output.head(10)

Output

Save export to CSV

df_output.to_csv(output_csv, index=False)