You want to import file A.py
in file B.py
, these two files are in the same folder, like this:
.
├── A.py
└── B.py
You can do this in file B.py:
import A
or
from A import *
or
from A import THINGS_YOU_WANT_TO_IMPORT_IN_A
Then you will be able to use all the functions of file A.py in file B.py
Second case
You want to import file folder/A.py
in file B.py
, these two files are not in the same folder, like this:
.
├── B.py
└── folder
└── A.py
You can do this in file B.py:
import folder.A
or
from folder.A import *
or
from folder.A import THINGS_YOU_WANT_TO_IMPORT_IN_A
Then you will be able to use all the functions of file A.py in file B.py
Summary
In the first case, `file A.py` is a module that you imports in `file B.py`, you used the syntax import module_name.
In the second case, folder is the package that contains the module `A.py`, you used the syntax import package_name.module_name.
$ cd /home/python
$ python -m SimpleHTTPServer
Thats just it, fire up your browser and the present directory files can be seen on http://localhost:8000.
If the directory has a files name such as index.html, then that file will served as the initial file. Otherwise all the files present in the directory will be listed.
If port 8000 is already being by any other server, the above mentioned command accepts an optional port number as well.
$ python -m SimpleHTTPServer <port_number>
This will start a server on the specified port.
For Python 3.x the command changes to:
$ python3 -m http.server
$ python -m venv .
#### "windows activation"
c:/> ./Scripts/activate
#### "linux activation"
$ source env-name/bin/activate
#### "POSIX-compatible system activation"
$ . env-name/bin/activate
#### "deactive"
$ ./Scripts/deactivate
$ pip install -r requirements.txt
#### "add to requirements.txt"
$ pip freeze > requirements.txt
$ python3 -m ensurepip
$ sudo apt install python3-pip python3-dev
$ sudo -H pip3 install --upgrade pip
$ sudo -H pip3 install virtualenv
$ virtualenv test
#### "windows"
c:/> Scripts/activate
#### "linux"
$ source testenv/bin/activate
deactivate
def uppercase(func):
def wrapper():
original_result = func()
modified_result = original_result.upper()
return modified_result
return wrapper
@uppercase
def hello():
"""Return a string."""
return 'hello world'
print(hello()) #HELLO WORLD
import time
def my_timer(func):
def _timer(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"Execution time: {end - start}")
return result
return _timer
@my_timer
def delayed_mean(sample):
time.sleep(1)
return sum(sample) / len(sample)
"""
how to Call
@my_timer
def delayed_mean(sample):
.....
or
delayed_mean = my_timer(delayed_mean)
"""
def trace(func):
def wrapper(*args, **kwargs):
print(f'TRACE: calling {func.__name__}() '
f'with {args}, {kwargs}')
original_result = func(*args, **kwargs)
print(f'TRACE: {func.__name__}() '
f'returned {original_result!r}')
return original_result
return wrapper
@trace
def say(name, line):
return f'{name}: {line}'
>>> say('Jane', 'Hello, World')
'TRACE: calling say() with ("Jane", "Hello, World"), {}'
'TRACE: say() returned "Jane: Hello, World"'
'Jane: Hello, World'
#### * used for list & tuple
#### ** used for dictionaries
$ pip install --user poetry
$ poetry new my-project
# my-project/
# ├── README.rst
# ├── my_project
# │ └── __init__.py
# ├── pyproject.toml
# └── tests
# ├── __init__.py
# └── test_my_project.py
#### activate poetry venv
$ poetry shell
#### add package
$ poetry add <package-name>
#### DockeFile
FROM python:3.9-slim
WORKDIR /app/
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY echo.py .
ENTRYPOINT ["python", "echo.py"]
#### alpine image (small & build from base image)
FROM alpine:3.13
WORKDIR /app/
RUN apk add --no-cache python3
COPY requirements.txt .
RUN apk add --no-cache py3-pip && \
pip3 install -r requirements.txt && \
apk del py3-pip
COPY echo.py .
CMD ["python", "echo.py"]
#### builld image
$ docker build -t <name> <path>
#### image size
$ docker images
# REPOSITORY TAG IMAGE ID CREATED SIZE
# echo-alpine latest e7e3a2bc7b71 About a minute ago 53.7MB
# echo latest 6b036d212e8f 40 minutes ago 126MB
#### run image
$ docker run -it --rm --publish 5000:5000 <image-name>
#### docker-compose.yml
version: '3.8'
services:
echo-server:
# this tell Docker Compose to build image from
# local (.) directory
build: .
# this is equivalent to "-p" option of
# the "docker run" command
ports:
- "5000:5000"
# this is equivalent to "-t" option of
# the "docker run" command
tty: true
environment:
- DATABASE_HOSTNAME=database
- DATABASE_PORT=5432
- DATABASE_PASSWORD=password
depends_on:
- database
command:
wait-for-it --service database:5432 --
#### or
#### watchmedo auto-restart --patterns "*.py" --recursive --
python echo.py
volumes:
- .:/app/
database:
image: postgres
restart: always
cache:
image: redis
#### wait-for-it and is actually written in Python so you can easily install it with pip.
#### wait-for-it by default times out after 15 seconds. After that timeout, it will start
#### the process after the -- mark regardless of whether it succeeded in connecting or
#### not. You can disable timeout using the --timeout 0 argument. Without the timeout, wait-for-it will wait indefinitely.
#### watchmedo is the basic usage format for reloading specified processes whenever there is a change to any Python file in the
#### current working directory. install with "$ pip install watchdog[watchmedo]"
#### run/stop
$ docker-compose up
$ docker-compose down
>>> {1, 2, 3} & {1, 4}
{1}
>>> {1, 2, 3} | {1, 4}
{1, 2, 3, 4}
>>> {1, 2, 3} - {1, 4}
{2, 3}
>>> {1, 2, 3} ^ {1, 4}
{2, 3, 4}
dictionary_1 | dictionary_2 #### merge two dictionary
from typing import Any
def get_ci(d: dict, key: str) -> Any:
for k, v in d.items():
if key.lower() == k.lower():
return v
def concatenate(*items, delim: str):
return delim.join(items)
#### example
>>> concatenate("John", "Doe", delim=" ")
'John Doe'
>>> concatenate("Ronald", "Reuel", "Tolkien", delim=" ")
'Ronald Reuel Tolkien'
>>> concatenate("Jay", delim=" ")
'Jay'
>>> concatenate(delim=" ")
''
- Python, unlike Kotlin and many other languages, freely permits multiple inheritance (although it often isn't a good idea)
- Another important Python differentiator is the lack of private/public keywords that would control access to internal object attributes outside of the class definition.
#### simple class
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
#### Every time an attribute is prefixed by __ (two underscores) within a class body
#### it is renamed by the interpreter on the fly (private variable)
class MyClass:
def __init__(self):
self.__secret_value = 1
- __ set __ (self, obj, value): This is called whenever the attribute is set.
- __ get __ (self, obj, owner=None): This is called whenever the attribute is read (referred to as a getter).
- __ delete __ (self, obj): This is called when del is invoked on the attribute.
#### Example
import random
class InitOnAccess:
def __init__(self, init_func, *args, **kwargs):
self.klass = init_func
self.args = args
self.kwargs = kwargs
self._initialized = None
def __get__(self, instance, owner):
if self._initialized is None:
print('initialized!')
self._initialized = self.klass(*self.args, **self.kwargs)
else:
print('cached!')
return self._initialized
class WithSortedRandoms:
lazily_initialized = InitOnAccess(sorted,[random.random() for _ in range(5)])
#### OutPut
#### >>> m = WithSortedRandoms()
#### >>> m.lazily_initialized
#### initialized!
#### [0.2592159616928279, 0.32590583255950756, 0.4015520901807743,
#### 0.4148447834912816, 0.4187058605495758, 0.4534290894962043,
#### 0.4796775578337028, 0.6963642650184283, 0.8449725511007807,
#### 0.8808174325885045]
#### >>> m.lazily_initialized
#### cached!
#### [0.2592159616928279, 0.32590583255950756, 0.4015520901807743,
#### 0.4148447834912816, 0.4187058605495758, 0.4534290894962043,
#### 0.4796775578337028, 0.6963642650184283, 0.8449725511007807,
#### 0.8808174325885045]
import time
from queue import Queue, Empty
from threading import Thread
import requests
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
THREAD_POOL_SIZE = 4
def fetch_rates(base):
response = requests.get(f"https://api.vatcomply.com/rates?base={base}")
response.raise_for_status()
rates = response.json()["rates"]
# note: same currency exchanges to itself 1:1
rates[base] = 1.
return base, rates
def present_result(base, rates):
rates_line = ", ".join([f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS ])
print(f"1 {base} = {rates_line}")
def worker(work_queue, results_queue):
while not work_queue.empty():
try:
item = work_queue.get_nowait()
except Empty:
break
try:
result = fetch_rates(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()
def main():
work_queue = Queue()
results_queue = Queue()
for base in BASES:
work_queue.put(base)
threads = [
Thread(target=worker,args=(work_queue, results_queue) ) for _ in range(THREAD_POOL_SIZE)]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
while not results_queue.empty():
result = results_queue.get()
if isinstance(result, Exception):
raise result
present_result(*results)
if __name__ == "__main__":
started = time.time()
main()
elapsed = time.time() - started
print()
print("time elapsed: {:.2f}s".format(elapsed))
multithreading is challenging. Dealing with threads in a sane and safe manner required a tremendous amount of code when compared to the synchronous approach. We had to set up a thread pool and communication queues, gracefully handle exceptions from threads, and also worry about thread safety when trying to provide a rate limiting capability. Dozens of lines of code are needed just to execute one function from some external library in parallel! And we rely on the promise from the external package creator that their library is thread-safe
from multiprocessing import Process
import os
def work(identifier):
print(
f'Hey, I am the process '
f'{identifier}, pid: {os.getpid()}'
)
def main():
processes = [
Process(target=work, args=(number,))
for number in range(5)
]
for process in processes:
process.start()
while processes:
processes.pop().join()
if __name__ == "__main__":
main()
import time
from multiprocessing import Pool
import requests
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
POOL_SIZE = 4
def fetch_rates(base):
response = requests.get(
f"https://api.vatcomply.com/rates?base={base}"
)
response.raise_for_status()
rates = response.json()["rates"]
# note: same currency exchanges to itself 1:1
rates[base] = 1.
return base, rates
def present_result(base, rates):
rates_line = ", ".join(
[f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
)
print(f"1 {base} = {rates_line}")
def main():
with Pool(POOL_SIZE) as pool:
results = pool.map(fetch_rates, BASES)
for result in results:
present_result(*result)
if __name__ == "__main__":
started = time.time()
main()
elapsed = time.time() - started
print()
print("time elapsed: {:.2f}s".format(elapsed))
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
def main(use_threads=False):
if use_threads:
pool_cls = ThreadPool
else:
pool_cls = ProcessPool
with pool_cls(POOL_SIZE) as pool:
results = pool.map(fetch_rates, BASES)
for result in results:
present_result(*result)
#### The dummy threading pool can also be imported from the multiprocessing.pool module as the ThreadPool class.
#### It will have the same implementation;
#### the actual import path is just a matter of personal preference.
The easiest way to think about asynchronous programming in Python is to imagine something similar to threads, but without system scheduling involved
import asyncio
import random
async def print_number(number):
await asyncio.sleep(random.random())
print(number)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.gather(*[
print_number(number)
for number in range(10)
])
)
loop.close()
#### (the * operator) to unpack the list of coroutines as arguments
import asyncio
import time
import aiohttp
from asyncrates import get_rates
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
def present_result(base, rates):
rates_line = ", ".join(
[f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
)
print(f"1 {base} = {rates_line}")
async def main():
async with aiohttp.ClientSession() as session:
for result in await asyncio.gather(*[
get_rates(session, base)
for base in BASES
]):
present_result(*result)
if __name__ == "__main__":
started = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
elapsed = time.time() - started
print()
print("time elapsed: {:.2f}s".format(elapsed))
Fortunately for us, the Python standard library provides the concurrent.futures module, which is also integrated with the asyncio module. These two modules together allow you to schedule blocking functions to execute in threads or additional processes as if they were asynchronous non-blocking coroutines.
The most important classes in the concurrent.futures module are Executor and Future
The Executor class is a base class not intended for instantiation and has the following two concrete implementations:
-
ThreadPoolExecutor: This is the one that represents a pool of threads
-
ProcessPoolExecutor: This is the one that represents a pool of processes Every executor provides the following three methods:
useful methods:
-
submit(func, *args, **kwargs): This schedules the func function for execution in a pool of resources and returns the Future object representing the execution of a callable
-
map(func, *iterables, timeout=None, chunksize=1): This executes the func function over an iterable in a similar way to the multiprocessing.Pool.map() method
-
shutdown(wait=True): This shuts down the executor and frees all of its resources
>>> def loudly_return():
... print("processing")
... return 42
...
>>> from concurrent.futures import ThreadPoolExecutor
>>> with ThreadPoolExecutor(1) as executor:
... future = executor.submit(loudly_return)
...
processing
>>> future
<Future at 0x33cbf98 state=finished returned int>
>>> future.result()
42
We can easily defer the blocking call to a separate thread with the loop.run_in_executor() call, while still leaving the fetch_rates() function as an awaitable coroutine, as follows:
async def fetch_rates(base):
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None, requests.get,
f"https://api.vatcomply.com/rates?base={base}"
)
response.raise_for_status()
rates = response.json()["rates"]
# note: same currency exchanges to itself 1:1
rates[base] = 1.
return base, rates
class Animal:
"""
A class used to represent an Animal
...
Attributes
----------
says_str : str
a formatted string to print out what the animal says
name : str
the name of the animal
sound : str
the sound that the animal makes
num_legs : int
the number of legs the animal has (default 4)
Methods
-------
says(sound=None)
Prints the animals name and what sound it makes
"""
says_str = "A {name} says {sound}"
def __init__(self, name, sound, num_legs=4):
"""
Parameters
----------
name : str
The name of the animal
sound : str
The sound the animal makes
num_legs : int, optional
The number of legs the animal (default is 4)
"""
self.name = name
self.sound = sound
self.num_legs = num_legs
def says(self, sound=None):
"""Prints what the animals name is and what sound it makes.
If the argument `sound` isn't passed in, the default Animal
sound is used.
Parameters
----------
sound : str, optional
The sound the animal makes (default is None)
Raises
------
NotImplementedError
If no sound is set for the animal or passed in as a
parameter.
"""
if self.sound is None and sound is None:
raise NotImplementedError("Silent Animals are not supported!")
out_sound = self.sound if sound is None else sound
print(self.says_str.format(name=self.name, sound=out_sound))
Package docstrings should be placed at the top of the package’s __ init __ .py file. This docstring should list the modules and sub-packages that are exported by the package.
Module docstrings are similar to class docstrings. Instead of classes and class methods being documented, it’s now the module and any functions found within. Module docstrings are placed at the top of the file even before any imports. Module docstrings should include the following:
-
A brief description of the module and its purpose
-
A list of any classes, exception, functions, and any other objects exported by the module The docstring for a module function should include the same items as a class method:
-
A brief description of what the function is and what it’s used for
-
Any arguments (both required and optional) that are passed including keyword arguments
-
Label any arguments that are considered optional
-
Any side effects that occur when executing the function
-
Any exceptions that are raised
-
Any restrictions on when the function can be called
"""Spreadsheet Column Printer
This script allows the user to print to the console all columns in the
spreadsheet. It is assumed that the first row of the spreadsheet is the
location of the columns.
This tool accepts comma separated value files (.csv) as well as excel
(.xls, .xlsx) files.
This script requires that `pandas` be installed within the Python
environment you are running this script in.
This file can also be imported as a module and contains the following
functions:
* get_spreadsheet_cols - returns the column headers of the file
* main - the main function of the script
"""
import argparse
import pandas as pd
def get_spreadsheet_cols(file_loc, print_cols=False):
"""Gets and prints the spreadsheet's header columns
Parameters
----------
file_loc : str
The file location of the spreadsheet
print_cols : bool, optional
A flag used to print the columns to the console (default is
False)
Returns
-------
list
a list of strings used that are the header columns
"""
file_data = pd.read_excel(file_loc)
col_headers = list(file_data.columns.values)
if print_cols:
print("\n".join(col_headers))
return col_headers
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'input_file',
type=str,
help="The spreadsheet file to pring the columns of"
)
args = parser.parse_args()
get_spreadsheet_cols(args.input_file, print_cols=True)
if __name__ == "__main__":
main()
class Circle:
def __init__(self, radius):
self.radius = radius
# Class implementation...
class Square:
def __init__(self, side):
self.side = side
# Class implementation...
def shape_factory(shape_name, *args, **kwargs):
shapes = {"circle": Circle, "square": Square}
return shapes[shape_name](*args, **kwargs)
"""
>>> circle = shape_factory("circle", radius=20)
>>> type(circle)
<class '__main__.Circle'>
>>> circle.radius
20
>>> square = shape_factory("square", side=10)
>>> type(square)
<class '__main__.Square'>
>>> square.side
10
"""
def gen():
yield 1
yield 2
return 3
"""
>>> g = gen()
>>> g
<generator object gen at 0x7f4ff4853c10>
>>> next(g)
1
>>> next(g)
2
>>> next(g)
Traceback (most recent call last):
File "<input>", line 1, in <module>
next(g)
StopIteration: 3
"""